Coder Social home page Coder Social logo

elastic-transport-go's Introduction

elastic-transport-go

This library was lifted from elasticsearch-net and then transformed to be used across all Elastic services rather than only Elasticsearch.

It provides the Transport interface used by go-elasticsearch, connection pool, cluster discovery, and multiple loggers.

Installation

Add the package to your go.mod file:

require github.com/elastic/elastic/transport-go/v8 main

Usage

Transport

The transport provides the basic layer to access Elasticsearch APIs.

package main

import (
	"log"
	"net/http"
	"net/url"

	"github.com/elastic/elastic-transport-go/v8/elastictransport"
)

func main() {
	u, _ := url.Parse("http://127.0.0.1:9200")

	cfg := elastictransport.Config{
		URLs: []*url.URL{u},
	}
	transport, err := elastictransport.New(cfg)
	if err != nil {
		log.Fatalln(err)
	}

	req, _ := http.NewRequest("GET", "/", nil)

	res, err := transport.Perform(req)
	if err != nil {
		log.Fatalln(err)
	}
	defer res.Body.Close()

	log.Println(res)
}

NOTE: It is critical to both close the response body and to consume it, in order to re-use persistent TCP connections in the default HTTP transport. If you're not interested in the response body, call io.Copy(ioutil.Discard, res.Body).

Discovery

Discovery module calls the cluster to retrieve its complete list of nodes.

Once your transport has been setup, you can easily trigger this behavior like so :

err := transport.DiscoverNodes()

Metrics

Allows you to retrieve metrics directly from the transport.

Loggers

One of multiple loggers can be injected directly into the Logger configuration, these are as follow:

TextLogger

config:

cfg := elastictransport.Config{
    elastictransport.TextLogger{os.Stdout, true, true},
}

output:

< {
<   "name" : "es",
<   "cluster_name" : "elasticsearch",
<   "cluster_uuid" : "RxB1iqTNT9q3LlIkTsmWRA",
<   "version" : {
<     "number" : "8.0.0-SNAPSHOT",
<     "build_flavor" : "default",
<     "build_type" : "docker",
<     "build_hash" : "0564e027dc6c69236937b1edcc04c207b4cd8128",
<     "build_date" : "2021-11-25T00:23:33.139514432Z",
<     "build_snapshot" : true,
<     "lucene_version" : "9.0.0",
<     "minimum_wire_compatibility_version" : "7.16.0",
<     "minimum_index_compatibility_version" : "7.0.0"
<   },
<   "tagline" : "You Know, for Search"
< }

JSONLogger

config:

cfg := elastictransport.Config{
    Logger: &elastictransport.JSONLogger{os.Stdout, true, true},
}

output:

{
  "@timestamp": "2021-11-25T16:33:51Z",
  "event": {
    "duration": 2892269
  },
  "url": {
    "scheme": "http",
    "domain": "127.0.0.1",
    "port": 9200,
    "path": "/",
    "query": ""
  },
  "http": {
    "request": {
      "method": "GET"
    },
    "response": {
      "status_code": 200,
      "body": "{\n  \"name\" : \"es1\",\n  \"cluster_name\" : \"go-elasticsearch\",\n  \"cluster_uuid\" : \"RxB1iqTNT9q3LlIkTsmWRA\",\n  \"version\" : {\n    \"number\" : \"8.0.0-SNAPSHOT\",\n    \"build_flavor\" : \"default\",\n    \"build_type\" : \"docker\",\n    \"build_hash\" : \"0564e027dc6c69236937b1edcc04c207b4cd8128\",\n    \"build_date\" : \"2021-11-25T00:23:33.139514432Z\",\n    \"build_snapshot\" : true,\n    \"lucene_version\" : \"9.0.0\",\n    \"minimum_wire_compatibility_version\" : \"8.0.0\",\n    \"minimum_index_compatibility_version\" : \"7.0.0\"\n  },\n  \"tagline\" : \"You Know, for Search\"\n}\n"
    }
  }
}

ColorLogger

config:

cfg := elastictransport.Config{
    Logger: &elastictransport.ColorLogger{os.Stdout, true, true},
}

output:

GET http://127.0.0.1:9200/ 200 OK 2ms
« {
«   "name" : "es1",
«   "cluster_name" : "go-elasticsearch",
«   "cluster_uuid" : "RxB1iqTNT9q3LlIkTsmWRA",
«   "version" : {
«     "number" : "8.0.0-SNAPSHOT",
«     "build_flavor" : "default",
«     "build_type" : "docker",
«     "build_hash" : "0564e027dc6c69236937b1edcc04c207b4cd8128",
«     "build_date" : "2021-11-25T00:23:33.139514432Z",
«     "build_snapshot" : true,
«     "lucene_version" : "9.0.0",
«     "minimum_wire_compatibility_version" : "7.16.0",
«     "minimum_index_compatibility_version" : "7.0.0"
«   },
«   "tagline" : "You Know, for Search"
« }
────────────────────────────────────────────────────────────────────────────────

CurlLogger

config:

cfg := elastictransport.Config{
    Logger: &elastictransport.CurlLogger{os.Stdout, true, true},
}

output:

curl -X GET 'http://localhost:9200/?pretty'
# => 2021-11-25T16:40:11Z [200 OK] 3ms
# {
#  "name": "es1",
#  "cluster_name": "go-elasticsearch",
#  "cluster_uuid": "RxB1iqTNT9q3LlIkTsmWRA",
#  "version": {
#   "number": "8.0.0-SNAPSHOT",
#   "build_flavor": "default",
#   "build_type": "docker",
#   "build_hash": "0564e027dc6c69236937b1edcc04c207b4cd8128",
#   "build_date": "2021-11-25T00:23:33.139514432Z",
#   "build_snapshot": true,
#   "lucene_version": "9.0.0",
#   "minimum_wire_compatibility_version": "7.16.0",
#   "minimum_index_compatibility_version": "7.0.0"
#  },
#  "tagline": "You Know, for Search"
# }

License

Licensed under the Apache License, Version 2.0.

elastic-transport-go's People

Contributors

anaethelion avatar pakio avatar rockdaboot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elastic-transport-go's Issues

Deadlock in statusConnectionPool

Deadlock occurs at statusConnectionPool.OnFailure.

https://github.com/elastic/elastic-transport-go/blob/v8.5.0/elastictransport/connection.go#L184

	sort.Slice(cp.dead, func(i, j int) bool {
		c1 := cp.dead[i]
		c2 := cp.dead[j]
		c1.Lock()
		c2.Lock() // <- here
		defer c1.Unlock()
		defer c2.Unlock()

		res := c1.Failures > c2.Failures
		return res
	})

Here is my hypothesis:

	/*
		Initially using Pool 1

			Goroutine 1:                                        |  DiscoverNodes()  | Pool 2
			Goroutine 2: get Pool 1 node        OnFailure()     |                   |  ...............{resurects on Pool 1}
			Goroutine 3: get Pool 1 node                        |                   |  OnFailure()
			Goroutine 4: get Pool 1 node                        |                   |  ....................................OnFailure()
	*/

Here is my test function:

package elastictransport

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	_ "net/http/pprof"
	"net/url"
	"sync"
	"testing"
	"time"
)

type ctxKey int

const shouldError, onTransport = ctxKey(1), ctxKey(2)

var someErr = errors.New("some error")

type mockTransport struct {
	t *testing.T
}

func (m *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
	switch req.URL.Path {
	case "/_nodes/http":
		n := nodeInfo{}
		n.HTTP.PublishAddress = "localhost:1234"
		n2 := nodeInfo{}
		n2.HTTP.PublishAddress = "localhost:5678"
		return m.fromJson(map[string]map[string]nodeInfo{
			"nodes": {
				"es0": n,
				"es1": n2,
			},
		}), nil
	default:
		if f, ok := req.Context().Value(onTransport).(func()); ok {
			f()
		}
		if y, _ := req.Context().Value(shouldError).(bool); y {
			return nil, someErr
		}
		return &http.Response{
			StatusCode: 200,
			Body:       io.NopCloser(bytes.NewReader(nil)),
		}, nil
	}
}

func (m *mockTransport) fromJson(v interface{}) *http.Response {
	b, err := json.Marshal(v)
	if err != nil {
		m.t.Fatal(err)
	}
	return &http.Response{
		StatusCode: 200,
		Body:       io.NopCloser(bytes.NewReader(b)),
	}
}

type lg struct{}

func (lg) Log(a ...interface{}) error {
	fmt.Println(a...)
	return nil
}
func (lg) Logf(format string, a ...interface{}) error {
	fmt.Printf(format, a...)
	return nil
}

func TestDeadlock(t *testing.T) {
	go func() {
		http.ListenAndServe(":8080", nil)
	}()
	debugLogger = lg{}
	defaultResurrectTimeoutInitial = 10 * time.Second

	var (
		transport http.RoundTripper = &mockTransport{t}
		makeReq                     = func(ctx context.Context) *http.Request {
			t.Helper()
			req, err := http.NewRequestWithContext(ctx, "GET", "", io.NopCloser(bytes.NewReader(nil)))
			if err != nil {
				t.Fatal(err)
			}
			return req
		}
	)
	u, err := url.Parse("http://localhost:9200") // just the initial node
	if err != nil {
		t.Fatal(err)
	}

	c, err := New(Config{
		Transport:    transport,
		URLs:         []*url.URL{u, u},
		DisableRetry: true,
	})
	if err != nil {
		t.Fatal(err)
	}

	c.DiscoverNodes()

	/*
		Initially using Pool 1

			Goroutine 1:                                        |  DiscoverNodes()  | Pool 2
			Goroutine 2: get Pool 1 node        OnFailure()     |                   |  ...............{resurects on Pool 1}
			Goroutine 3: get Pool 1 node                        |                   |  OnFailure()
			Goroutine 4: get Pool 1 node                        |                   |  ....................................OnFailure()
	*/

	var (
		firstPhaseWg,
		secondPhaseWg,
		thirdPhaseWg,
		getPoolWg,
		discoverNodesWg sync.WaitGroup
	)

	var firstPhases = func(c *Client) {
		defer firstPhaseWg.Done()
		ctx := context.Background()
		ctx = context.WithValue(ctx, shouldError, true)
		ctx = context.WithValue(ctx, onTransport, func() {
			getPoolWg.Done()
		})
		c.Perform(makeReq(ctx))
	}

	var discoverNodesPhase = func(c *Client) {
		defer discoverNodesWg.Done()
		getPoolWg.Wait()
		c.DiscoverNodes()
	}

	var secondPhases = func(c *Client) {
		defer secondPhaseWg.Done()
		ctx := context.Background()
		ctx = context.WithValue(ctx, shouldError, true)
		ctx = context.WithValue(ctx, onTransport, func() {
			getPoolWg.Done()
			firstPhaseWg.Wait()
			discoverNodesWg.Wait()
		})
		c.Perform(makeReq(ctx))
	}

	var thirdPhases = func(c *Client) {
		defer thirdPhaseWg.Done()
		ctx := context.Background()
		ctx = context.WithValue(ctx, shouldError, true)
		ctx = context.WithValue(ctx, onTransport, func() {
			getPoolWg.Done()
			secondPhaseWg.Wait()
			time.Sleep(10 * time.Second)
		})
		c.Perform(makeReq(ctx))
	}

	runPar := func(n int, wg []*sync.WaitGroup, f func(*Client), c *Client) {
		for _, w := range wg {
			w.Add(n)
		}
		for i := 0; i < n; i++ {
			go f(c)
		}
	}

	var N = 10
	runPar(N, []*sync.WaitGroup{&firstPhaseWg, &getPoolWg}, firstPhases, c)
	runPar(N, []*sync.WaitGroup{&secondPhaseWg, &getPoolWg}, secondPhases, c)
	runPar(N, []*sync.WaitGroup{&thirdPhaseWg, &getPoolWg}, thirdPhases, c)
	runPar(1, []*sync.WaitGroup{&discoverNodesWg}, discoverNodesPhase, c)

	thirdPhaseWg.Wait()

	fmt.Println("pool urls", c.pool.URLs())
	fmt.Println("deads", c.pool.(*statusConnectionPool).dead)
}

I can consistently get it to deadlock, and when I open http://localhost:8080/debug/pprof/goroutine?debug=1 I get this profile

1 @ 0x43b356 0x44ca0f 0x44c9e6 0x4694e6 0x474065 0x73d745 0x73d71d 0x490251 0x49083d 0x48fd9a 0x73d429 0x741c09 0x74bda9 0x46d701
#	0x4694e5	sync.runtime_SemacquireMutex+0x25										/home/<user>/go1.20.11/go/src/runtime/sema.go:77
#	0x474064	sync.(*Mutex).lockSlow+0x164											/home/<user>/go1.20.11/go/src/sync/mutex.go:171
#	0x73d744	sync.(*Mutex).Lock+0xa4												/home/<user>/go1.20.11/go/src/sync/mutex.go:90
#	0x73d71c	github.com/elastic/elastic-transport-go/v8/elastictransport.(*statusConnectionPool).OnFailure.func1+0x7c	/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/connection.go:184
#	0x490250	sort.insertionSort_func+0xb0											/home/<user>/go1.20.11/go/src/sort/zsortfunc.go:12
#	0x49083c	sort.pdqsort_func+0x2dc												/home/<user>/go1.20.11/go/src/sort/zsortfunc.go:73
#	0x48fd99	sort.Slice+0xf9													/home/<user>/go1.20.11/go/src/sort/slice.go:26
#	0x73d428	github.com/elastic/elastic-transport-go/v8/elastictransport.(*statusConnectionPool).OnFailure+0x268		/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/connection.go:180
#	0x741c08	github.com/elastic/elastic-transport-go/v8/elastictransport.(*Client).Perform+0x9a8				/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/elastictransport.go:386
#	0x74bda8	github.com/elastic/elastic-transport-go/v8/elastictransport.TestDeadlock.func6+0x148				/home/<user>/github.com/elastic/elastic-transport-go/elastictransport/a_test.go:172

Suggested fix: in *Client.Perform:

  1. Save the pool to a local variable when calling .Next()
  2. Use the local var when calling OnSuccess() or OnFailure()

Thanks.

Failover on TLS errors

At present, estransport will retry a request with another host if the request fails with io.EOF, some network errors, or on configurable response status codes:

https://github.com/elastic/go-elasticsearch/blob/75263a5e99d25479728f8a6ec9151d347d87370f/estransport/estransport.go#L384-L416

This is helpful when one host becomes inaccessible.

We would like to configure an application with multiple Elasticsearch hosts, where each host is/should be accessible but only some have verifiable TLS certificates. What we would like is for requests that fail TLS verification to be retried with another host. At the moment this does not happen, as these errors are not network errors.

More generally, I wonder if we should reverse the logic of retries: retry by default, and exclude some errors.

Logger Output and Specifying Host Headers

I am checking the operation with Logger to see how to specify the Host header, but the output differs depending on the Logger.
Also, I am not sure if I am specifying the Host header correctly.
SourceCode

package main

import (
	"os"
	"log"
	"net/http"
	"crypto/tls"

	elasticsearch "github.com/elastic/go-elasticsearch/v7"
	estransport "github.com/elastic/go-elasticsearch/v7/estransport"
)

func main() {
	cfg := elasticsearch.Config{
		Addresses: []string{"https://ssh-tunnel:19200/"},
		Username:  "elastic",
		Password:  "password",
		Header:    http.Header{"Host": []string{"somewhere.es.asia-northeast1.gcp.cloud.es.io"}},
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
		},
		Logger: &estransport.TextLogger{
		//Logger: &estransport.JsonLogger{
		//Logger: &estransport.CurlLogger{
			Output:             os.Stdout,
			EnableRequestBody:  true,
			EnableResponseBody: true,
		},
		EnableDebugLogger: true,
	}
	es, _ := elasticsearch.NewClient(cfg)
	log.Println(es.Info())
}

Output of TextLogger

golang@c8b920b337fb:~/some_backend/src$ go run main.go
2022-03-18T05:48:12Z GET https://ssh-tunnel:19200/ [status:404 request:182ms]
< {"ok":false,"message":"Unknown resource."}
2022/03/18 14:48:12 [0 <nil>] cannot retrieve informations from Elasticsearch

Output of JSONLogger

golang@c8b920b337fb:~/some_backend/src$ go run main.go
{"@timestamp":"2022-03-18T05:48:58Z","event":{"duration":332604100},"url":{"scheme":"https","domain":"ssh-tunnel","port":19200,"path":"/","query":""},"http":{"request":{"method":"GET"},"response":{"status_code":404,"body":"{\"ok\":false,\"message\":\"Unknown resource.\"}\n"}}}
2022/03/18 14:48:58 [0 <nil>] cannot retrieve informations from Elasticsearch

Output of CurlLogger

golang@c8b920b337fb:~/some_backend/src$ go run main.go
curl -X GET -H 'Host: somewhere.es.asia-northeast1.gcp.cloud.es.io' -H 'X-Elastic-Client-Meta: es=7.17.0,go=1.17.6,t=7.17.0,hc=1.17.6' 'http://localhost:9200/?pretty'
# => 2022-03-18T05:46:51Z [404 Not Found] 224ms
# {
#  "ok": false,
#  "message": "Unknown resource."
# }



2022/03/18 14:46:51 [0 <nil>] cannot retrieve informations from Elasticsearch

When using curlLogger, https://ssh-tunnel:19200/ has been replaced by http://localhost:9200
For the connection without go-elasticSearch, we are getting the expected 200 OK results.
The source code and response results at that time are as follows
SourceCode

package main

import (
    "fmt"
    "log"
    "net/http"
    "net/http/httputil"
    "crypto/tls"
)

func main() {
    req, err := http.NewRequest(http.MethodGet, "https://ssh-tunnel:19200/", nil)
    if err != nil {
        log.Fatal(err)
    }

    req.Host = "somewhere.es.asia-northeast1.gcp.cloud.es.io"
    req.SetBasicAuth("elastic", "password")

	client := &http.Client{}
    client.Transport = &http.Transport{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
    }

    res, err := client.Do(req)
    if err != nil {
        log.Fatal(err)
    }

    dump, _ := httputil.DumpResponse(res, true)
    fmt.Printf("%q", dump)
}

Output

golang@c8b920b337fb:~/some_backend/src$ go run main.go
"HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=UTF-8\r\nDate: Fri, 18 Mar 2022 05:59:29 GMT\r\nX-Cloud-Request-Id: AU_0RjIkSXeNDrC2mMNGUw\r\nX-Elastic-Product: Elasticsearch\r\nX-Found-Handling-Cluster: 7dc5c050851046e69a4a97184c94f5fc\r\nX-Found-Handling-Instance: instance-0000000000\r\n\r\n{\n  \"name\" : \"instance-0000000000\",\n  \"cluster_name\" : \"7dc5c050851046e69a4a97184c94f5fc\",\n  \"cluster_uuid\" : \"_FHW7jXNQkGXsL7oTyCdlg\",\n  \"version\" : {\n    \"number\" : \"7.16.3\",\n    \"build_flavor\" : \"default\",\n    \"build_type\" : \"docker\",\n    \"build_hash\" : \"4e6e4eab2297e949ec994e688dad46290d018022\",\n    \"build_date\" : \"2022-01-06T23:43:02.825887787Z\",\n    \"build_snapshot\" : false,\n    \"lucene_version\" : \"8.10.1\",\n    \"minimum_wire_compatibility_version\" : \"6.8.0\",\n    \"minimum_index_compatibility_version\" : \"6.0.0-beta1\"\n  },\n  \"tagline\" : \"You Know, for Search\"\n}\n"

Improve handling of TLS certificate pinning

elastic/go-elasticsearch#358 added certificate pinning when talking to Elasticsearch. The current implementation has some limitations though:

  • The certificate pinning is only applied to *http.Transport instances, excluding transports that are wrapped by some other logic, for example when wrapped by the apm go agent.
  • when applied, the given DialTLS function is replaced by the go-elasticsearch agent function, only checking the TLS fingerprint (https://github.com/elastic/go-elasticsearch/blob/main/estransport/estransport.go#L157-L181).
  • InsecureSkipVerify is set to true when checking the fingerprint, not considering potentially configured CA certs at this point.
  • It only supports one fingerprint

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.