You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
662 lines
17 KiB
662 lines
17 KiB
// Package client (v2) is the current official Go client for InfluxDB. |
|
package client // import "github.com/influxdata/influxdb/client/v2" |
|
|
|
import ( |
|
"bytes" |
|
"crypto/tls" |
|
"encoding/json" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"io/ioutil" |
|
"mime" |
|
"net/http" |
|
"net/url" |
|
"path" |
|
"strconv" |
|
"strings" |
|
"time" |
|
|
|
"github.com/influxdata/influxdb/models" |
|
) |
|
|
|
// HTTPConfig is the config data needed to create an HTTP Client. |
|
type HTTPConfig struct { |
|
// Addr should be of the form "http://host:port" |
|
// or "http://[ipv6-host%zone]:port". |
|
Addr string |
|
|
|
// Username is the influxdb username, optional. |
|
Username string |
|
|
|
// Password is the influxdb password, optional. |
|
Password string |
|
|
|
// UserAgent is the http User Agent, defaults to "InfluxDBClient". |
|
UserAgent string |
|
|
|
// Timeout for influxdb writes, defaults to no timeout. |
|
Timeout time.Duration |
|
|
|
// InsecureSkipVerify gets passed to the http client, if true, it will |
|
// skip https certificate verification. Defaults to false. |
|
InsecureSkipVerify bool |
|
|
|
// TLSConfig allows the user to set their own TLS config for the HTTP |
|
// Client. If set, this option overrides InsecureSkipVerify. |
|
TLSConfig *tls.Config |
|
|
|
// Proxy configures the Proxy function on the HTTP client. |
|
Proxy func(req *http.Request) (*url.URL, error) |
|
} |
|
|
|
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct. |
|
type BatchPointsConfig struct { |
|
// Precision is the write precision of the points, defaults to "ns". |
|
Precision string |
|
|
|
// Database is the database to write points to. |
|
Database string |
|
|
|
// RetentionPolicy is the retention policy of the points. |
|
RetentionPolicy string |
|
|
|
// Write consistency is the number of servers required to confirm write. |
|
WriteConsistency string |
|
} |
|
|
|
// Client is a client interface for writing & querying the database. |
|
type Client interface { |
|
// Ping checks that status of cluster, and will always return 0 time and no |
|
// error for UDP clients. |
|
Ping(timeout time.Duration) (time.Duration, string, error) |
|
|
|
// Write takes a BatchPoints object and writes all Points to InfluxDB. |
|
Write(bp BatchPoints) error |
|
|
|
// Query makes an InfluxDB Query on the database. This will fail if using |
|
// the UDP client. |
|
Query(q Query) (*Response, error) |
|
|
|
// Close releases any resources a Client may be using. |
|
Close() error |
|
} |
|
|
|
// NewHTTPClient returns a new Client from the provided config. |
|
// Client is safe for concurrent use by multiple goroutines. |
|
func NewHTTPClient(conf HTTPConfig) (Client, error) { |
|
if conf.UserAgent == "" { |
|
conf.UserAgent = "InfluxDBClient" |
|
} |
|
|
|
u, err := url.Parse(conf.Addr) |
|
if err != nil { |
|
return nil, err |
|
} else if u.Scheme != "http" && u.Scheme != "https" { |
|
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+ |
|
" must start with http:// or https://", u.Scheme) |
|
return nil, errors.New(m) |
|
} |
|
|
|
tr := &http.Transport{ |
|
TLSClientConfig: &tls.Config{ |
|
InsecureSkipVerify: conf.InsecureSkipVerify, |
|
}, |
|
Proxy: conf.Proxy, |
|
} |
|
if conf.TLSConfig != nil { |
|
tr.TLSClientConfig = conf.TLSConfig |
|
} |
|
return &client{ |
|
url: *u, |
|
username: conf.Username, |
|
password: conf.Password, |
|
useragent: conf.UserAgent, |
|
httpClient: &http.Client{ |
|
Timeout: conf.Timeout, |
|
Transport: tr, |
|
}, |
|
transport: tr, |
|
}, nil |
|
} |
|
|
|
// Ping will check to see if the server is up with an optional timeout on waiting for leader. |
|
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. |
|
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) { |
|
now := time.Now() |
|
|
|
u := c.url |
|
u.Path = path.Join(u.Path, "ping") |
|
|
|
req, err := http.NewRequest("GET", u.String(), nil) |
|
if err != nil { |
|
return 0, "", err |
|
} |
|
|
|
req.Header.Set("User-Agent", c.useragent) |
|
|
|
if c.username != "" { |
|
req.SetBasicAuth(c.username, c.password) |
|
} |
|
|
|
if timeout > 0 { |
|
params := req.URL.Query() |
|
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds())) |
|
req.URL.RawQuery = params.Encode() |
|
} |
|
|
|
resp, err := c.httpClient.Do(req) |
|
if err != nil { |
|
return 0, "", err |
|
} |
|
defer resp.Body.Close() |
|
|
|
body, err := ioutil.ReadAll(resp.Body) |
|
if err != nil { |
|
return 0, "", err |
|
} |
|
|
|
if resp.StatusCode != http.StatusNoContent { |
|
var err = fmt.Errorf(string(body)) |
|
return 0, "", err |
|
} |
|
|
|
version := resp.Header.Get("X-Influxdb-Version") |
|
return time.Since(now), version, nil |
|
} |
|
|
|
// Close releases the client's resources. |
|
func (c *client) Close() error { |
|
c.transport.CloseIdleConnections() |
|
return nil |
|
} |
|
|
|
// client is safe for concurrent use as the fields are all read-only |
|
// once the client is instantiated. |
|
type client struct { |
|
// N.B - if url.UserInfo is accessed in future modifications to the |
|
// methods on client, you will need to synchronize access to url. |
|
url url.URL |
|
username string |
|
password string |
|
useragent string |
|
httpClient *http.Client |
|
transport *http.Transport |
|
} |
|
|
|
// BatchPoints is an interface into a batched grouping of points to write into |
|
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate |
|
// batch for each goroutine. |
|
type BatchPoints interface { |
|
// AddPoint adds the given point to the Batch of points. |
|
AddPoint(p *Point) |
|
// AddPoints adds the given points to the Batch of points. |
|
AddPoints(ps []*Point) |
|
// Points lists the points in the Batch. |
|
Points() []*Point |
|
|
|
// Precision returns the currently set precision of this Batch. |
|
Precision() string |
|
// SetPrecision sets the precision of this batch. |
|
SetPrecision(s string) error |
|
|
|
// Database returns the currently set database of this Batch. |
|
Database() string |
|
// SetDatabase sets the database of this Batch. |
|
SetDatabase(s string) |
|
|
|
// WriteConsistency returns the currently set write consistency of this Batch. |
|
WriteConsistency() string |
|
// SetWriteConsistency sets the write consistency of this Batch. |
|
SetWriteConsistency(s string) |
|
|
|
// RetentionPolicy returns the currently set retention policy of this Batch. |
|
RetentionPolicy() string |
|
// SetRetentionPolicy sets the retention policy of this Batch. |
|
SetRetentionPolicy(s string) |
|
} |
|
|
|
// NewBatchPoints returns a BatchPoints interface based on the given config. |
|
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) { |
|
if conf.Precision == "" { |
|
conf.Precision = "ns" |
|
} |
|
if _, err := time.ParseDuration("1" + conf.Precision); err != nil { |
|
return nil, err |
|
} |
|
bp := &batchpoints{ |
|
database: conf.Database, |
|
precision: conf.Precision, |
|
retentionPolicy: conf.RetentionPolicy, |
|
writeConsistency: conf.WriteConsistency, |
|
} |
|
return bp, nil |
|
} |
|
|
|
type batchpoints struct { |
|
points []*Point |
|
database string |
|
precision string |
|
retentionPolicy string |
|
writeConsistency string |
|
} |
|
|
|
func (bp *batchpoints) AddPoint(p *Point) { |
|
bp.points = append(bp.points, p) |
|
} |
|
|
|
func (bp *batchpoints) AddPoints(ps []*Point) { |
|
bp.points = append(bp.points, ps...) |
|
} |
|
|
|
func (bp *batchpoints) Points() []*Point { |
|
return bp.points |
|
} |
|
|
|
func (bp *batchpoints) Precision() string { |
|
return bp.precision |
|
} |
|
|
|
func (bp *batchpoints) Database() string { |
|
return bp.database |
|
} |
|
|
|
func (bp *batchpoints) WriteConsistency() string { |
|
return bp.writeConsistency |
|
} |
|
|
|
func (bp *batchpoints) RetentionPolicy() string { |
|
return bp.retentionPolicy |
|
} |
|
|
|
func (bp *batchpoints) SetPrecision(p string) error { |
|
if _, err := time.ParseDuration("1" + p); err != nil { |
|
return err |
|
} |
|
bp.precision = p |
|
return nil |
|
} |
|
|
|
func (bp *batchpoints) SetDatabase(db string) { |
|
bp.database = db |
|
} |
|
|
|
func (bp *batchpoints) SetWriteConsistency(wc string) { |
|
bp.writeConsistency = wc |
|
} |
|
|
|
func (bp *batchpoints) SetRetentionPolicy(rp string) { |
|
bp.retentionPolicy = rp |
|
} |
|
|
|
// Point represents a single data point. |
|
type Point struct { |
|
pt models.Point |
|
} |
|
|
|
// NewPoint returns a point with the given timestamp. If a timestamp is not |
|
// given, then data is sent to the database without a timestamp, in which case |
|
// the server will assign local time upon reception. NOTE: it is recommended to |
|
// send data with a timestamp. |
|
func NewPoint( |
|
name string, |
|
tags map[string]string, |
|
fields map[string]interface{}, |
|
t ...time.Time, |
|
) (*Point, error) { |
|
var T time.Time |
|
if len(t) > 0 { |
|
T = t[0] |
|
} |
|
|
|
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return &Point{ |
|
pt: pt, |
|
}, nil |
|
} |
|
|
|
// String returns a line-protocol string of the Point. |
|
func (p *Point) String() string { |
|
return p.pt.String() |
|
} |
|
|
|
// PrecisionString returns a line-protocol string of the Point, |
|
// with the timestamp formatted for the given precision. |
|
func (p *Point) PrecisionString(precision string) string { |
|
return p.pt.PrecisionString(precision) |
|
} |
|
|
|
// Name returns the measurement name of the point. |
|
func (p *Point) Name() string { |
|
return string(p.pt.Name()) |
|
} |
|
|
|
// Tags returns the tags associated with the point. |
|
func (p *Point) Tags() map[string]string { |
|
return p.pt.Tags().Map() |
|
} |
|
|
|
// Time return the timestamp for the point. |
|
func (p *Point) Time() time.Time { |
|
return p.pt.Time() |
|
} |
|
|
|
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch. |
|
func (p *Point) UnixNano() int64 { |
|
return p.pt.UnixNano() |
|
} |
|
|
|
// Fields returns the fields for the point. |
|
func (p *Point) Fields() (map[string]interface{}, error) { |
|
return p.pt.Fields() |
|
} |
|
|
|
// NewPointFrom returns a point from the provided models.Point. |
|
func NewPointFrom(pt models.Point) *Point { |
|
return &Point{pt: pt} |
|
} |
|
|
|
func (c *client) Write(bp BatchPoints) error { |
|
var b bytes.Buffer |
|
|
|
for _, p := range bp.Points() { |
|
if p == nil { |
|
continue |
|
} |
|
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { |
|
return err |
|
} |
|
|
|
if err := b.WriteByte('\n'); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
u := c.url |
|
u.Path = path.Join(u.Path, "write") |
|
|
|
req, err := http.NewRequest("POST", u.String(), &b) |
|
if err != nil { |
|
return err |
|
} |
|
req.Header.Set("Content-Type", "") |
|
req.Header.Set("User-Agent", c.useragent) |
|
if c.username != "" { |
|
req.SetBasicAuth(c.username, c.password) |
|
} |
|
|
|
params := req.URL.Query() |
|
params.Set("db", bp.Database()) |
|
params.Set("rp", bp.RetentionPolicy()) |
|
params.Set("precision", bp.Precision()) |
|
params.Set("consistency", bp.WriteConsistency()) |
|
req.URL.RawQuery = params.Encode() |
|
|
|
resp, err := c.httpClient.Do(req) |
|
if err != nil { |
|
return err |
|
} |
|
defer resp.Body.Close() |
|
|
|
body, err := ioutil.ReadAll(resp.Body) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { |
|
var err = fmt.Errorf(string(body)) |
|
return err |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// Query defines a query to send to the server. |
|
type Query struct { |
|
Command string |
|
Database string |
|
RetentionPolicy string |
|
Precision string |
|
Chunked bool |
|
ChunkSize int |
|
Parameters map[string]interface{} |
|
} |
|
|
|
// NewQuery returns a query object. |
|
// The database and precision arguments can be empty strings if they are not needed for the query. |
|
func NewQuery(command, database, precision string) Query { |
|
return Query{ |
|
Command: command, |
|
Database: database, |
|
Precision: precision, |
|
Parameters: make(map[string]interface{}), |
|
} |
|
} |
|
|
|
// NewQueryWithRP returns a query object. |
|
// The database, retention policy, and precision arguments can be empty strings if they are not needed |
|
// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater. |
|
func NewQueryWithRP(command, database, retentionPolicy, precision string) Query { |
|
return Query{ |
|
Command: command, |
|
Database: database, |
|
RetentionPolicy: retentionPolicy, |
|
Precision: precision, |
|
Parameters: make(map[string]interface{}), |
|
} |
|
} |
|
|
|
// NewQueryWithParameters returns a query object. |
|
// The database and precision arguments can be empty strings if they are not needed for the query. |
|
// parameters is a map of the parameter names used in the command to their values. |
|
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query { |
|
return Query{ |
|
Command: command, |
|
Database: database, |
|
Precision: precision, |
|
Parameters: parameters, |
|
} |
|
} |
|
|
|
// Response represents a list of statement results. |
|
type Response struct { |
|
Results []Result |
|
Err string `json:"error,omitempty"` |
|
} |
|
|
|
// Error returns the first error from any statement. |
|
// It returns nil if no errors occurred on any statements. |
|
func (r *Response) Error() error { |
|
if r.Err != "" { |
|
return fmt.Errorf(r.Err) |
|
} |
|
for _, result := range r.Results { |
|
if result.Err != "" { |
|
return fmt.Errorf(result.Err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Message represents a user message. |
|
type Message struct { |
|
Level string |
|
Text string |
|
} |
|
|
|
// Result represents a resultset returned from a single statement. |
|
type Result struct { |
|
Series []models.Row |
|
Messages []*Message |
|
Err string `json:"error,omitempty"` |
|
} |
|
|
|
// Query sends a command to the server and returns the Response. |
|
func (c *client) Query(q Query) (*Response, error) { |
|
u := c.url |
|
u.Path = path.Join(u.Path, "query") |
|
|
|
jsonParameters, err := json.Marshal(q.Parameters) |
|
|
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
req, err := http.NewRequest("POST", u.String(), nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
req.Header.Set("Content-Type", "") |
|
req.Header.Set("User-Agent", c.useragent) |
|
|
|
if c.username != "" { |
|
req.SetBasicAuth(c.username, c.password) |
|
} |
|
|
|
params := req.URL.Query() |
|
params.Set("q", q.Command) |
|
params.Set("db", q.Database) |
|
if q.RetentionPolicy != "" { |
|
params.Set("rp", q.RetentionPolicy) |
|
} |
|
params.Set("params", string(jsonParameters)) |
|
if q.Chunked { |
|
params.Set("chunked", "true") |
|
if q.ChunkSize > 0 { |
|
params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) |
|
} |
|
} |
|
|
|
if q.Precision != "" { |
|
params.Set("epoch", q.Precision) |
|
} |
|
req.URL.RawQuery = params.Encode() |
|
|
|
resp, err := c.httpClient.Do(req) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer resp.Body.Close() |
|
|
|
// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb |
|
// but instead some other service. If the error code is also a 500+ code, then some |
|
// downstream loadbalancer/proxy/etc had an issue and we should report that. |
|
if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { |
|
body, err := ioutil.ReadAll(resp.Body) |
|
if err != nil || len(body) == 0 { |
|
return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode) |
|
} |
|
|
|
return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) |
|
} |
|
|
|
// If we get an unexpected content type, then it is also not from influx direct and therefore |
|
// we want to know what we received and what status code was returned for debugging purposes. |
|
if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { |
|
// Read up to 1kb of the body to help identify downstream errors and limit the impact of things |
|
// like downstream serving a large file |
|
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) |
|
if err != nil || len(body) == 0 { |
|
return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) |
|
} |
|
|
|
return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) |
|
} |
|
|
|
var response Response |
|
if q.Chunked { |
|
cr := NewChunkedResponse(resp.Body) |
|
for { |
|
r, err := cr.NextResponse() |
|
if err != nil { |
|
// If we got an error while decoding the response, send that back. |
|
return nil, err |
|
} |
|
|
|
if r == nil { |
|
break |
|
} |
|
|
|
response.Results = append(response.Results, r.Results...) |
|
if r.Err != "" { |
|
response.Err = r.Err |
|
break |
|
} |
|
} |
|
} else { |
|
dec := json.NewDecoder(resp.Body) |
|
dec.UseNumber() |
|
decErr := dec.Decode(&response) |
|
|
|
// ignore this error if we got an invalid status code |
|
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { |
|
decErr = nil |
|
} |
|
// If we got a valid decode error, send that back |
|
if decErr != nil { |
|
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr) |
|
} |
|
} |
|
|
|
// If we don't have an error in our json response, and didn't get statusOK |
|
// then send back an error |
|
if resp.StatusCode != http.StatusOK && response.Error() == nil { |
|
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode) |
|
} |
|
return &response, nil |
|
} |
|
|
|
// duplexReader reads responses and writes it to another writer while |
|
// satisfying the reader interface. |
|
type duplexReader struct { |
|
r io.Reader |
|
w io.Writer |
|
} |
|
|
|
func (r *duplexReader) Read(p []byte) (n int, err error) { |
|
n, err = r.r.Read(p) |
|
if err == nil { |
|
r.w.Write(p[:n]) |
|
} |
|
return n, err |
|
} |
|
|
|
// ChunkedResponse represents a response from the server that |
|
// uses chunking to stream the output. |
|
type ChunkedResponse struct { |
|
dec *json.Decoder |
|
duplex *duplexReader |
|
buf bytes.Buffer |
|
} |
|
|
|
// NewChunkedResponse reads a stream and produces responses from the stream. |
|
func NewChunkedResponse(r io.Reader) *ChunkedResponse { |
|
resp := &ChunkedResponse{} |
|
resp.duplex = &duplexReader{r: r, w: &resp.buf} |
|
resp.dec = json.NewDecoder(resp.duplex) |
|
resp.dec.UseNumber() |
|
return resp |
|
} |
|
|
|
// NextResponse reads the next line of the stream and returns a response. |
|
func (r *ChunkedResponse) NextResponse() (*Response, error) { |
|
var response Response |
|
|
|
if err := r.dec.Decode(&response); err != nil { |
|
if err == io.EOF { |
|
return nil, nil |
|
} |
|
// A decoding error happened. This probably means the server crashed |
|
// and sent a last-ditch error message to us. Ensure we have read the |
|
// entirety of the connection to get any remaining error text. |
|
io.Copy(ioutil.Discard, r.duplex) |
|
return nil, errors.New(strings.TrimSpace(r.buf.String())) |
|
} |
|
|
|
r.buf.Reset() |
|
return &response, nil |
|
}
|
|
|