Skip to content
Snippets Groups Projects
Commit 0fcd3b58 authored by Michael Zigldrum's avatar Michael Zigldrum
Browse files

Merge branch 'refactor-client' into 'main'

Refactor client

Closes #5

See merge request eclipse/xfsc/libraries/messaging/cloudeventprovider!6
parents 895bf731 ce04c74f
No related branches found
No related tags found
No related merge requests found
Showing
with 524 additions and 972 deletions
......@@ -2,6 +2,7 @@ package cloudeventprovider
import (
"context"
"errors"
"fmt"
"time"
......@@ -20,28 +21,35 @@ type CloudEventProviderClient struct {
alive bool
}
// newClient ignores topic parameter for http connections
func newClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
protocol := currentCloudEventConfig.Messaging.Protocol
var ErrInvalidConfig = errors.New("invalid config (type of Config.Settings does not match Config.Protocol)")
// newClient ignores topic parameter for http connections
func newClient(conf Config, connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
ctx := context.Background()
if conf.Settings == nil {
return nil, errors.New("Config.Settings is nil")
}
var connection interface{}
var err error
if protocol == Http {
httpConfig := currentCloudEventConfig.Messaging.Http
if conf.Protocol == ProtocolTypeHttp {
httpConfig, ok := conf.Settings.(HttpConfig)
if !ok {
return nil, ErrInvalidConfig
}
switch connectionType {
case Pub, Req:
case ConnectionTypePub, ConnectionTypeReq:
connection, err = cloudevents.NewHTTP(cloudevents.WithTarget(httpConfig.Url))
case Sub, Rep:
case ConnectionTypeSub, ConnectionTypeRep:
connection, err = cloudevents.NewHTTP(cloudevents.WithPort(httpConfig.Port), cloudevents.WithPath("/"+httpConfig.Path))
default:
return nil, fmt.Errorf("unknown connectionType: %s. Could not create HttpConnection", connectionType)
}
} else {
connection, err = newCloudEventConnection(protocol, connectionType, topic)
connection, err = newCloudEventConnection(conf, connectionType, topic)
}
if err != nil {
......@@ -56,21 +64,19 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEventProvider
// if http cloudEventConnection = nil
cloudEventConnection, _ := connection.(cloudEventConnection)
newClient := &CloudEventProviderClient{
return &CloudEventProviderClient{
context: ctx,
protocol: protocol,
protocol: conf.Protocol,
conn: cloudEventConnection,
connectionClient: connectionClient,
connectionType: connectionType,
alive: true,
}
return newClient, nil
}, nil
}
func (c *CloudEventProviderClient) Close() error {
//TODO: what about closing http?
if c.protocol == Http {
if c.protocol == ProtocolTypeHttp {
return nil
}
......@@ -86,68 +92,89 @@ func (c *CloudEventProviderClient) Alive() bool {
return c.alive
}
// Pub publishes the given event.Event
// DEPRECATED: Use PubCtx
func (c *CloudEventProviderClient) Pub(event event.Event) error {
if c.connectionType == Pub {
result := c.connectionClient.Send(c.context, event)
if err := getResultError(result); err != nil {
return err
}
return nil
} else {
return c.PubCtx(context.TODO(), event)
}
// PubCtx publishes the given event.Event
func (c *CloudEventProviderClient) PubCtx(ctx context.Context, event event.Event) error {
if c.connectionType != ConnectionTypePub {
return fmt.Errorf("pub is not supported for connectionType %s", c.connectionType)
}
result := c.connectionClient.Send(ctx, event)
if err := getResultError(result); err != nil {
return err
}
return nil
}
// Sub method is blocking. Use it in a goroutine.
// Sub Subscribes the client and calls the given fn on receive
// DEPRECATED: Use SubCtx
func (c *CloudEventProviderClient) Sub(fn func(event event.Event)) error {
if c.connectionType == Sub {
if err := c.connectionClient.StartReceiver(c.context, fn); err != nil {
return err
}
return nil
} else {
return c.SubCtx(context.TODO(), fn)
}
// SubCtx Subscribes the client and calls the given fn on receive
func (c *CloudEventProviderClient) SubCtx(ctx context.Context, fn func(event event.Event)) error {
if c.connectionType != ConnectionTypeSub {
return fmt.Errorf("sub is not supported for connectionType %s", c.connectionType)
}
return c.connectionClient.StartReceiver(ctx, fn)
}
// Request sends the given event.Event as a request and returns the response
// DEPRECATED: Use RequestCtx
func (c *CloudEventProviderClient) Request(event event.Event, timeOut time.Duration) (*event.Event, error) {
if c.connectionType == Req {
ctx := context.WithValue(c.context, "timeOut", timeOut)
response, result := c.connectionClient.Request(ctx, event)
if err := getResultError(result); err != nil {
return nil, err
}
return response, nil
} else {
ctx, cancelFn := context.WithTimeout(context.TODO(), timeOut)
defer cancelFn()
return c.RequestCtx(ctx, event)
}
// RequestCtx sends the given event.Event as a request and returns the response
func (c *CloudEventProviderClient) RequestCtx(ctx context.Context, event event.Event) (*event.Event, error) {
if c.connectionType != ConnectionTypeReq {
return nil, fmt.Errorf("request is not supported for connectionType %s", c.connectionType)
}
response, result := c.connectionClient.Request(ctx, event)
if err := getResultError(result); err != nil {
return nil, err
}
return response, nil
}
// Reply method is blocking. Use it in a goroutine.
// Reply method is blocking. Use it in a goroutine
// DEPRECATED: Use ReplyCtx
func (c *CloudEventProviderClient) Reply(responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error {
if c.connectionType == Rep {
switch c.protocol {
case Nats:
natsReplyConsumer, ok := c.conn.(natsReplyConsumerInterface)
if !ok {
return fmt.Errorf("reply is not supported for connectionType %s", c.connectionType)
}
if err := natsReplyConsumer.Reply(c.context, responseFunc); err != nil {
return err
}
return nil
case Http:
err := c.connectionClient.StartReceiver(c.context, responseFunc)
if err != nil {
return fmt.Errorf("error while starting receiver: %w", err)
}
return nil
default:
return fmt.Errorf("reply is not supported for protocol %s", c.protocol)
}
} else {
return c.ReplyCtx(context.TODO(), responseFunc)
}
// ReplyCtx method is blocking. Use it in a goroutine
func (c *CloudEventProviderClient) ReplyCtx(ctx context.Context, responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error {
if c.connectionType != ConnectionTypeRep {
return fmt.Errorf("reply is not supported for connectionType %s", c.connectionType)
}
switch c.protocol {
case ProtocolTypeHttp:
return c.connectionClient.StartReceiver(ctx, responseFunc)
case ProtocolTypeNats:
natsReplyConsumer, ok := c.conn.(natsReplyConsumerInterface)
if !ok {
return fmt.Errorf("reply is not supported for connectionType %s", c.connectionType)
}
return natsReplyConsumer.Reply(ctx, responseFunc)
default:
return fmt.Errorf("reply is not supported for protocol %s", c.protocol)
}
}
func getResultError(result protocol.Result) error {
......
package cloudeventprovider
import (
"context"
"encoding/json"
"time"
"github.com/spf13/viper"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
)
type CloudEventProvider interface {
Close() error
Alive() bool
Pub(event event.Event) error
PubCtx(ctx context.Context, event event.Event) error
Sub(fn func(event event.Event)) error
SubCtx(ctx context.Context, fn func(event event.Event)) error
Request(event event.Event, timeOut time.Duration) (*event.Event, error)
RequestCtx(ctx context.Context, event event.Event) (*event.Event, error)
Reply(responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error
ReplyCtx(ctx context.Context, responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error
}
func New(conf Config, conType ConnectionType, topic string) (*CloudEventProviderClient, error) {
return newClient(conf, conType, topic)
}
func NewClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
cloudEventProviderViper = viper.New()
config, err := loadConfig()
if err != nil {
return nil, err
}
return newClient(*config, connectionType, topic)
}
func NewEvent(eventSource string, eventType string, data json.RawMessage) (event.Event, error) {
newEvent := cloudevents.NewEvent()
newEvent.SetID(uuid.New().String())
newEvent.SetID(uuid.NewString())
newEvent.SetSource(eventSource)
newEvent.SetType(eventType)
if err := newEvent.SetData(cloudevents.ApplicationJSON, data); err != nil {
return newEvent, err
}
return newEvent, nil
}
func NewClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
if err := loadConfig(); err != nil {
panic(err)
}
return newClient(connectionType, topic)
}
......@@ -7,125 +7,175 @@ import (
"strings"
"time"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
)
var cloudEventProviderViper *viper.Viper
func init() {
cloudEventProviderViper = viper.New()
type Config struct {
Protocol ProtocolType `mapstructure:"protocol"`
Settings protocolConfig
}
type cloudEventProviderConfiguration struct {
Messaging struct {
Protocol ProtocolType `mapstructure:"protocol"`
Nats struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"nats"`
NatsJetstream struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
StreamType string `mapstructure:"streamType"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"natsJetstream"`
Kafka struct {
Url string `mapstructure:"url"`
GroupId string `mapstructure:"groupId,omitempty"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"kafka"`
Mqtt struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"mqtt"`
Ampq struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"amqp"`
Http struct {
Url string `mapstructure:"url"`
Port int `mapstructure:"port"`
Path string `mapstructure:"path"`
} `mapstructure:"http"`
} `mapstructure:"messaging"`
type protocolConfig interface {
validate() error
}
var currentCloudEventConfig cloudEventProviderConfiguration
type NatsConfig struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
}
func loadConfig() error {
if err := bindEnvs(); err != nil {
return err
func (c NatsConfig) validate() error {
if c.Url == "" {
return errors.New("missing value for required field Url")
}
readConfig()
if err := cloudEventProviderViper.Unmarshal(&currentCloudEventConfig); err != nil {
return err
return nil
}
type NatsJetstreamConfig struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
StreamType string `mapstructure:"streamType"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
}
func (c NatsJetstreamConfig) validate() error {
if c.Url == "" {
return errors.New("missing value for required field Url")
}
if err := checkConfig(); err != nil {
return err
if c.StreamType == "" {
return errors.New("missing value for required field StreamType")
}
return nil
}
func checkConfig() error {
if cloudEventProviderViper.IsSet("messaging.protocol") {
err := checkIfProtocolConfigIsSet(currentCloudEventConfig.Messaging.Protocol)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("protocol is not set")
}
}
func checkIfProtocolConfigIsSet(protocol ProtocolType) error {
//TODO: loop trough config -> no need for switch
var err error
switch protocol {
case Http:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Http)
case Nats:
//TODO: check for subject name conventions https://docs.nats.io/nats-concepts/subjects
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Nats)
case NatsJetstream:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.NatsJetstream)
case Kafka:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Kafka)
case Mqtt:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Mqtt)
case Amqp:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Ampq)
default:
err = fmt.Errorf("protocol %s is not supported", protocol)
type KafkaConfig struct {
Url string `mapstructure:"url"`
GroupId string `mapstructure:"groupId,omitempty"`
ClientId string `mapstructure:"clientId"`
}
func (c KafkaConfig) validate() error {
if c.Url == "" {
return errors.New("missing value for required field Url")
}
return err
if c.ClientId == "" {
return errors.New("missing value for required field ClientId")
}
return nil
}
func checkIfAllConfigKeysAreSet(protocol ProtocolType, config interface{}) error {
var configMap map[string]interface{}
if err := mapstructure.Decode(config, &configMap); err != nil {
return fmt.Errorf("could not decode config to map for checking config: %w", err)
type MqttConfig struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
}
func (c MqttConfig) validate() error {
if c.Url == "" {
return errors.New("missing value for required field Url")
}
for configKey := range configMap {
if !cloudEventProviderViper.IsSet("messaging." + string(protocol) + "." + configKey) {
return fmt.Errorf("%w: missing configKey %s for protocol %s", ErrConfigKeyMissing, configKey, protocol)
}
if c.ClientId == "" {
return errors.New("missing value for required field ClientId")
}
return nil
}
type AmqpConfig struct {
Url string `mapstructure:"url"`
}
func (c AmqpConfig) validate() error {
if c.Url == "" {
return errors.New("missing value for required field Url")
}
return nil
}
type HttpConfig struct {
Url string `mapstructure:"url"`
Port int `mapstructure:"port"`
Path string `mapstructure:"path"`
}
func (c HttpConfig) validate() error {
if c.Url == "" {
return errors.New("missing value for required field Url")
}
if c.Port == 0 {
return errors.New("missing value for required field Port")
}
if c.Path == "" {
return errors.New("missing value for required field Path")
}
return nil
}
type cloudEventProviderConfiguration struct {
Messaging struct {
Protocol ProtocolType `mapstructure:"protocol"`
Nats NatsConfig `mapstructure:"nats"`
NatsJetstream NatsJetstreamConfig `mapstructure:"natsJetstream"`
Kafka KafkaConfig `mapstructure:"kafka"`
Mqtt MqttConfig `mapstructure:"mqtt"`
Amqp AmqpConfig `mapstructure:"amqp"`
Http HttpConfig `mapstructure:"http"`
} `mapstructure:"messaging"`
}
func loadConfig() (*Config, error) {
if err := bindEnvs(); err != nil {
return nil, err
}
readConfig()
var config cloudEventProviderConfiguration
if err := cloudEventProviderViper.Unmarshal(&config); err != nil {
return nil, err
}
var proConf protocolConfig
switch config.Messaging.Protocol {
case ProtocolTypeHttp:
proConf = config.Messaging.Http
case ProtocolTypeNats:
proConf = config.Messaging.Nats
case ProtocolTypeNatsJetstream:
proConf = config.Messaging.NatsJetstream
case ProtocolTypeAmqp:
proConf = config.Messaging.Amqp
case ProtocolTypeMqtt:
proConf = config.Messaging.Mqtt
case ProtocolTypeKafka:
proConf = config.Messaging.Kafka
default:
return nil, fmt.Errorf("missing or invalid protocol")
}
if err := proConf.validate(); err != nil {
return nil, err
}
conf := Config{
Protocol: config.Messaging.Protocol,
Settings: proConf,
}
return &conf, nil
}
func bindEnvs() error {
envs := []string{
"messaging.protocol",
......@@ -148,12 +198,10 @@ func bindEnvs() error {
"messaging.http.path",
}
for _, env := range envs {
err := cloudEventProviderViper.BindEnv(env)
if err != nil {
return fmt.Errorf("could not bind env %s: %w", env, err)
}
if err := cloudEventProviderViper.BindEnv(envs...); err != nil {
return fmt.Errorf("could not bind env: %w", err)
}
return nil
}
......@@ -170,6 +218,7 @@ func readConfig() {
if errors.As(err, &configFileNotFoundError) {
log.Printf("Configuration not found but environment variables will be taken into account.")
}
cloudEventProviderViper.AutomaticEnv()
}
}
package cloudeventprovider
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestOptionalConfig(t *testing.T) {
......@@ -12,10 +14,12 @@ func TestOptionalConfig(t *testing.T) {
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_QUEUEGROUP", "test")
err := loadConfig()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
conf, err := loadConfig()
require.NoError(t, err)
assert.Equal(t, conf.Protocol, ProtocolTypeNats)
assert.Implements(t, (*protocolConfig)(nil), conf.Settings)
}
func TestMissingConfig(t *testing.T) {
......@@ -23,8 +27,9 @@ func TestMissingConfig(t *testing.T) {
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_URL", "http://localhost:4222")
err := loadConfig()
if !errors.Is(errors.Unwrap(err), ErrConfigKeyMissing) {
t.Errorf("Expected loadConfig to throw an ErrConfigKeyMissing but err is '%v' instead", err)
}
conf, err := loadConfig()
require.Error(t, err)
assert.Nil(t, conf)
assert.ErrorIs(t, err, ErrConfigKeyMissing)
}
......@@ -3,171 +3,116 @@ package cloudeventprovider
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/nats-io/nats.go"
"log"
"net"
"net/url"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/Azure/go-amqp"
"github.com/IBM/sarama"
ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2"
kafkaSarama "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
mqttPaho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/eclipse/paho.golang/paho"
"github.com/nats-io/nats.go"
)
type cloudEventConnection interface {
Close(ctx context.Context) error
}
func newCloudEventConnection(protocolType ProtocolType, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
var newConnection cloudEventConnection
var err error
switch protocolType {
case Kafka:
newConnection, err = newKafkaConnection(connectionType, topic)
if err != nil {
return nil, err
}
case Nats:
newConnection, err = newNatsConnection(connectionType, topic)
if err != nil {
return nil, err
}
case NatsJetstream:
newConnection, err = newNatsJetstreamConnection(connectionType, topic)
if err != nil {
return nil, err
}
case Mqtt:
newConnection, err = newMqttConnection(connectionType, topic)
if err != nil {
return nil, err
}
case Amqp:
newConnection, err = newAmqpConnection(connectionType, topic)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown protocolType: %s. Could not create cloudEventConnection", protocolType)
func newCloudEventConnection(conf Config, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
switch conf.Protocol {
case ProtocolTypeKafka:
return newKafkaConnection(conf.Settings.(KafkaConfig), connectionType, topic)
case ProtocolTypeNats:
return newNatsConnection(conf.Settings.(NatsConfig), connectionType, topic)
case ProtocolTypeNatsJetstream:
return newNatsJetstreamConnection(conf.Settings.(NatsJetstreamConfig), connectionType, topic)
case ProtocolTypeMqtt:
return newMqttConnection(conf.Settings.(MqttConfig), connectionType, topic)
case ProtocolTypeAmqp:
return newAmqpConnection(conf.Settings.(AmqpConfig), connectionType, topic)
}
return newConnection, nil
}
func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
kafkaConfig := currentCloudEventConfig.Messaging.Kafka
return nil, fmt.Errorf("unknown protocolType: %s. Could not create cloudEventConnection", conf.Protocol)
}
func newKafkaConnection(conf KafkaConfig, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
switch connectionType {
case Pub:
sender, err := kafka_sarama.NewSender([]string{kafkaConfig.Url}, saramaConfig, topic)
if err != nil {
return nil, err
}
return sender, nil
case Sub:
receiver, err := kafka_sarama.NewConsumer([]string{kafkaConfig.Url}, saramaConfig, kafkaConfig.GroupId, topic)
if err != nil {
return nil, err
}
return receiver, nil
default:
return nil, fmt.Errorf("unknown connectionType: %s. Could not create KafkaConnection", connectionType)
case ConnectionTypePub:
return kafkaSarama.NewSender([]string{conf.Url}, saramaConfig, topic)
case ConnectionTypeSub:
return kafkaSarama.NewConsumer([]string{conf.Url}, saramaConfig, conf.GroupId, topic)
}
return nil, fmt.Errorf("unknown connectionType: %s. Could not create KafkaConnection", connectionType)
}
func newNatsConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsConfig := currentCloudEventConfig.Messaging.Nats
func newNatsConnection(conf NatsConfig, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
var natsOptions []nats.Option
if natsConfig.TimeoutInSec != 0*time.Second {
natsOptions = append(natsOptions, nats.Timeout(natsConfig.TimeoutInSec*time.Second))
if conf.TimeoutInSec != 0*time.Second {
natsOptions = append(natsOptions, nats.Timeout(conf.TimeoutInSec*time.Second))
}
switch connectionType {
case Pub:
sender, err := cenats.NewSender(natsConfig.Url, topic, natsOptions)
if err != nil {
return nil, err
}
return sender, nil
case Sub:
var consumerOption cenats.ConsumerOption
if natsConfig.QueueGroup != "" {
consumerOption = cenats.WithQueueSubscriber(natsConfig.QueueGroup)
}
consumer, err := cenats.NewConsumer(natsConfig.Url, topic, natsOptions, consumerOption)
if err != nil {
return nil, err
}
return consumer, nil
case Req:
requester, err := newNatsRequester(natsConfig.Url, topic, natsOptions...)
if err != nil {
return nil, err
}
return requester, nil
case Rep:
respondConsumer, err := newNatsRespondConsumer(natsConfig.Url, topic, natsConfig.QueueGroup, natsOptions...)
if err != nil {
return nil, err
}
return respondConsumer, nil
default:
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsConnection", connectionType)
case ConnectionTypePub:
return cenats.NewSender(conf.Url, topic, natsOptions)
case ConnectionTypeSub:
consumerOptions := make([]cenats.ConsumerOption, 0)
if conf.QueueGroup != "" {
consumerOptions = append(consumerOptions, cenats.WithQueueSubscriber(conf.QueueGroup))
}
return cenats.NewConsumer(conf.Url, topic, natsOptions, consumerOptions...)
case ConnectionTypeReq:
return newNatsRequester(conf.Url, topic, natsOptions...)
case ConnectionTypeRep:
return newNatsRespondConsumer(conf.Url, topic, conf.QueueGroup, natsOptions...)
}
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsConnection", connectionType)
}
func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsJetstreamConfig := currentCloudEventConfig.Messaging.NatsJetstream
func newNatsJetstreamConnection(conf NatsJetstreamConfig, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
var natsJetstreamOptions []nats.Option
if natsJetstreamConfig.TimeoutInSec != 0*time.Second {
natsJetstreamOptions = append(natsJetstreamOptions, nats.Timeout(natsJetstreamConfig.TimeoutInSec*time.Second))
if conf.TimeoutInSec != 0 {
natsJetstreamOptions = []nats.Option{nats.Timeout(conf.TimeoutInSec * time.Second)}
}
switch connectionType {
case Pub:
sender, err := cejsm.NewSender(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, topic, natsJetstreamOptions, nil)
if err != nil {
return nil, err
}
return sender, nil
case Sub:
case ConnectionTypePub:
return cejsm.NewSender(conf.Url, conf.StreamType, topic, natsJetstreamOptions, nil)
case ConnectionTypeSub:
var consumerOption cejsm.ConsumerOption
if natsJetstreamConfig.QueueGroup != "" {
consumerOption = cejsm.WithQueueSubscriber(natsJetstreamConfig.QueueGroup)
if conf.QueueGroup != "" {
consumerOption = cejsm.WithQueueSubscriber(conf.QueueGroup)
}
consumer, err := cejsm.NewConsumer(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, topic, natsJetstreamOptions, nil, nil, consumerOption)
if err != nil {
return nil, err
}
return consumer, nil
default:
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsJetstreamConnection", connectionType)
return cejsm.NewConsumer(conf.Url, conf.StreamType, topic, natsJetstreamOptions, nil, nil, consumerOption)
}
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsJetstreamConnection", connectionType)
}
func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
mqttConfig := currentCloudEventConfig.Messaging.Mqtt
func newMqttConnection(conf MqttConfig, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
ctx := context.Background()
conn, err := net.Dial("tcp", mqttConfig.Url)
conn, err := net.Dial("tcp", conf.Url)
if err != nil {
return nil, err
}
switch connectionType {
case Pub:
case ConnectionTypePub:
connectionConfig := &paho.ClientConfig{
ClientID: mqttConfig.ClientId,
ClientID: conf.ClientId,
Conn: conn,
}
// optional connect option
......@@ -181,9 +126,9 @@ func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventC
return nil, err
}
return sender, nil
case Sub:
case ConnectionTypeSub:
connectionConfig := &paho.ClientConfig{
ClientID: mqttConfig.ClientId,
ClientID: conf.ClientId,
Conn: conn,
}
subscribeOpt := &paho.Subscribe{
......@@ -205,36 +150,35 @@ func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventC
}
}
func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
amqpUrl, node, opts := parseAmqpConfig()
if connectionType == Sub || connectionType == Pub {
var protocol *ceamqp.Protocol
var err error
if topic != "" {
protocol, err = ceamqp.NewProtocol(amqpUrl, topic, []amqp.ConnOption{}, []amqp.SessionOption{}, opts...)
} else {
protocol, err = ceamqp.NewProtocol(amqpUrl, node, []amqp.ConnOption{}, []amqp.SessionOption{}, opts...)
}
if err != nil {
return nil, err
}
return protocol, nil
} else {
func newAmqpConnection(conf AmqpConfig, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
if connectionType != ConnectionTypeSub && connectionType != ConnectionTypePub {
return nil, fmt.Errorf("unknown connectionType: %s. Could not create AmqpConnection", connectionType)
}
amqpUrl, node, opts := parseAmqpConfig(conf)
if topic != "" {
return ceamqp.NewProtocol(amqpUrl, topic, nil, nil, opts...)
}
return ceamqp.NewProtocol(amqpUrl, node, nil, nil, opts...)
}
func parseAmqpConfig() (amqpUrl, node string, opts []ceamqp.Option) {
func parseAmqpConfig(conf AmqpConfig) (string, string, []ceamqp.Option) {
// TODO: authentication over URL is not safe!
amqpUrl = currentCloudEventConfig.Messaging.Ampq.Url
parsedUrl, err := url.Parse(amqpUrl)
parsedUrl, err := url.Parse(conf.Url)
if err != nil {
log.Fatal(err)
}
if parsedUrl.User != nil {
user := parsedUrl.User.Username()
pass, _ := parsedUrl.User.Password()
opts = append(opts, ceamqp.WithConnOpt(amqp.ConnSASLPlain(user, pass)))
if parsedUrl.User == nil {
return conf.Url, strings.TrimPrefix(parsedUrl.Path, "/"), nil
}
return amqpUrl, strings.TrimPrefix(parsedUrl.Path, "/"), opts
user := parsedUrl.User.Username()
pass, _ := parsedUrl.User.Password()
return conf.Url,
strings.TrimPrefix(parsedUrl.Path, "/"),
[]ceamqp.Option{ceamqp.WithConnOpt(amqp.ConnSASLPlain(user, pass))}
}
module gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudeventprovider
go 1.21.4
go 1.21
require (
github.com/Shopify/sarama v1.38.1
github.com/Azure/go-amqp v0.17.0
github.com/IBM/sarama v1.42.2
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240112101907-998919d6876d
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240212142714-4cc6c2d62d63
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.14.0
github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.15.0
github.com/eclipse/paho.golang v0.12.0
github.com/google/uuid v1.5.0
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats.go v1.32.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
)
require (
github.com/Azure/go-amqp v0.17.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/go-resiliency v1.5.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
......@@ -32,17 +32,19 @@ require (
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
......@@ -54,11 +56,11 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
......
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A=
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0=
github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0 h1:RHQ4j4gtKyRekiMI2k58cBlf8oJjDYhStfurd0VI25w=
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0/go.mod h1:6C9Iu3TSM1alrSktRBUwDJfOum1CyJe+k2QzlL75xM4=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0 h1:1MCVOxNZySIYOWMI1+6Z7YR0PK3AmDi/Fklk1KdFIv8=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0/go.mod h1:/B8nchIwQlr00jtE9bR0aoKaag7bO67xPM7r1DXCH4I=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240112101907-998919d6876d h1:BLWoDnB8RIwMrp7FJtHCdVmglikQ3BSttZtA9Il4Sx0=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240112101907-998919d6876d/go.mod h1:s+KZsVZst0bVW6vuKYb8CH49CcSJDO09+ZiIeKzJmqE=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.0 h1:YIsMNgteY2QBjE2sJ13bOXBi0Jzl/iPAIq6Ayr4l6Go=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.15.0/go.mod h1:bRB2h22ARQl0EqVVmPTK+valYhDdLAdNDc3wLYsw7qw=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240212142714-4cc6c2d62d63 h1:BuPrAR70nWYd8V8/0Q5BX1rw1CujnGyWt6LF6EmnSLg=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240212142714-4cc6c2d62d63/go.mod h1:s+KZsVZst0bVW6vuKYb8CH49CcSJDO09+ZiIeKzJmqE=
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.14.0 h1:cPOXwhwRb+RtHrPSs6Qmobgt4q/0e4wNBdfUjOeV9Qw=
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.14.0/go.mod h1:BQefJHVdyw9MqEG5EdualOQ/JgYMViAEzkSbAp6qCKA=
github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.14.0 h1:Iq1ivWHGtkg10E38bxQj0pbLe9C6AWhX49i+QVxUilw=
github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.14.0/go.mod h1:Kly/VEwcO3vyOY5Rd17sTALl1rtzGyPncfTHHSvA2gY=
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/cloudevents/sdk-go/v2 v2.15.0 h1:aKnhLQhyoJXqEECQdOIZnbZ9VupqlidE6hedugDGr+I=
github.com/cloudevents/sdk-go/v2 v2.15.0/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/go-resiliency v1.5.0 h1:dRsaR00whmQD+SgVKlq/vCRFNgtEb5yppyeVos3Yce0=
github.com/eapache/go-resiliency v1.5.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q=
......@@ -63,8 +61,8 @@ github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVET
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8=
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
......@@ -95,8 +93,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
......@@ -105,9 +103,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
......@@ -130,6 +127,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
......@@ -137,6 +135,7 @@ github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSW
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
......@@ -150,42 +149,49 @@ go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
......@@ -194,6 +200,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
......
package cloudeventprovider
type ProtocolType string
type ConnectionType string
const (
ProtocolTypeHttp ProtocolType = "http"
ProtocolTypeKafka ProtocolType = "kafka"
ProtocolTypeNats ProtocolType = "nats"
ProtocolTypeNatsJetstream ProtocolType = "natsJetstream"
ProtocolTypeMqtt ProtocolType = "mqtt"
ProtocolTypeAmqp ProtocolType = "amqp"
)
// Deprecated, use ProtocolType*
const (
Http ProtocolType = "http"
Kafka ProtocolType = "kafka"
......@@ -12,6 +21,16 @@ const (
Amqp ProtocolType = "amqp"
)
type ConnectionType string
const (
ConnectionTypePub ConnectionType = "pub"
ConnectionTypeSub ConnectionType = "sub"
ConnectionTypeReq ConnectionType = "req"
ConnectionTypeRep ConnectionType = "rep"
)
// Deprecated, use ConnectionType*
const (
Pub ConnectionType = "pub"
Sub ConnectionType = "sub"
......
package test
import (
"encoding/json"
"log"
"os"
"testing"
"time"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudeventprovider"
)
/**
TODO - these tests required a working NATS server
*/
func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func Test_NewClientFromConfig(t *testing.T) {
config := cloudeventprovider.Config{
Protocol: cloudeventprovider.Nats,
Settings: cloudeventprovider.NatsConfig{
Url: "127.0.0.1",
TimeoutInSec: 0,
},
}
sender, err := cloudeventprovider.NewClientFromConfig(config, cloudeventprovider.Pub, "testing")
require.NoError(t, err)
receiver, err := cloudeventprovider.NewClientFromConfig(config, cloudeventprovider.Sub, "testing")
require.NoError(t, err)
testData, err := json.Marshal("test")
if err != nil {
t.Fatalf("failed to marshal test data: %v", err)
}
events := make(chan event.Event)
go func(client *cloudeventprovider.CloudEventProviderClient, ch chan event.Event) {
t.Logf("starting sub...")
err := client.Sub(func(event event.Event) {
log.Printf("RECEIVED EVENT: %v", event)
ch <- event
})
require.NoError(t, err)
}(receiver, events)
go func() {
time.Sleep(time.Second)
e, err := cloudeventprovider.NewEvent("testing", "testing", testData)
require.NoError(t, err)
t.Log("sending event...")
require.NoError(t, sender.Pub(e))
}()
received := <-events
assert.Equal(t, "testing", received.Type())
assert.Equal(t, testData, received.Data())
}
func Test_NewClient(t *testing.T) {
client, err := cloudeventprovider.NewClient(cloudeventprovider.Sub, "testing")
require.NoError(t, err)
assert.NotNil(t, client)
}
messaging:
protocol: nats
nats:
url: http://localhost:4222
timeoutInSec: 10
\ No newline at end of file
# Binary files (no line-ending conversions), diff using hexdump
*.bin binary diff=hex
amqp.test
/fuzz/*/*
!/fuzz/*/corpus
/fuzz/*.zip
*.log
/cmd
cover.out
.envrc
recordings
.vscode
.idea
*.env
\ No newline at end of file
# Microsoft Open Source Code of Conduct
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
Resources:
- [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/)
- [Microsoft Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/)
- Contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with questions or concerns
# Contributing
This repo is no longer under active development. See [issue #205](https://github.com/vcabbage/amqp/issues/205) for details.
~~Whether it's code, documentation, and/or example, all contributions are appreciated.~~
~~To ensure a smooth process, here are some guidelines and expectations:~~
* ~~An issue should be created discussing any non-trivial change. Small changes, such as a fixing a typo, don't need an issue.~~
* ~~Ideally, an issue should describe both the problem to be solved and a proposed solution.~~
* ~~Please indicate that you want to work on the change in the issue. If you change your mind about working on an issue you are always free to back out. There will be no hard feelings.~~
* ~~Depending on the scope, there may be some back and forth about the problem and solution. This is intended to be a collaborative discussion to ensure the problem is adequately solved in a manner that fits well in the library.~~
~~Once you're ready to open a PR:~~
* ~~Ensure code is formatted with `gofmt`.~~
* ~~You may also want to peruse https://github.com/golang/go/wiki/CodeReviewComments and check that code conforms to the recommendations.~~
* ~~Tests are appreciated, but not required. The integration tests are currently specific to Microsoft Azure and require a number of credentials provided via environment variables. This can be a high barrier if you don't already have setup that works with the tests.~~
* ~~When you open the PR CI will run unit tests. Integration tests will be run manually as part of the review.~~
* ~~All PRs will be merged as a single commit. If your PR includes multiple commits they will be squashed together before merging. This usually isn't a big deal, but if you have any questions feel free to ask.~~
~~I do my best to respond to issues and PRs in a timely fashion. If it's been a couple days without a response or if it seems like I've overlooked something, feel free to ping me.~~
## Debugging
### Logging
To enable debug logging, build with `-tags debug`. This enables debug level 1 by default. You can increase the level by setting the `DEBUG_LEVEL` environment variable to 2 or higher. (Debug logging is disabled entirely without `-tags debug`, regardless of `DEBUG_LEVEL` setting.)
To add additional logging, use the `debug(level int, format string, v ...interface{})` function, which is similar to `fmt.Printf` but takes a level as it's first argument.
### Packet Capture
Wireshark can be very helpful in diagnosing interactions between client and server. If the connection is not encrypted Wireshark can natively decode AMQP 1.0. If the connection is encrypted with TLS you'll need to log out the keys.
Example of logging the TLS keys:
```go
// Create the file
f, err := os.OpenFile("key.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
// Configure TLS
tlsConfig := &tls.Config{
KeyLogWriter: f,
}
// Dial the host
const host = "my.amqp.server"
conn, err := tls.Dial("tcp", host+":5671", tlsConfig)
// Create the connections
client, err := amqp.New(conn,
amqp.ConnSASLPlain("username", "password"),
amqp.ConnServerHostname(host),
)
```
You'll need to configure Wireshark to read the key.log file in Preferences > Protocols > SSL > (Pre)-Master-Secret log filename.
MIT License
Copyright (C) 2017 Kale Blankenship
Portions Copyright (C) Microsoft Corporation
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
PACKAGE := github.com/Azure/go-amqp
FUZZ_DIR := ./fuzz
all: test
fuzzconn:
go-fuzz-build -o $(FUZZ_DIR)/conn.zip -func FuzzConn $(PACKAGE)
go-fuzz -bin $(FUZZ_DIR)/conn.zip -workdir $(FUZZ_DIR)/conn
fuzzmarshal:
go-fuzz-build -o $(FUZZ_DIR)/marshal.zip -func FuzzUnmarshal $(PACKAGE)
go-fuzz -bin $(FUZZ_DIR)/marshal.zip -workdir $(FUZZ_DIR)/marshal
fuzzclean:
rm -f $(FUZZ_DIR)/**/{crashers,suppressions}/*
rm -f $(FUZZ_DIR)/*.zip
test:
TEST_CORPUS=1 go test -race -run=Corpus
go test -v -race ./...
#integration:
#go test -tags "integration" -count=1 -v -race .
test386:
TEST_CORPUS=1 go test -count=1 -v .
ci: test386 coverage
coverage:
TEST_CORPUS=1 go test -cover -coverprofile=cover.out -v
# **github.com/Azure/go-amqp**
[![Build Status](https://dev.azure.com/azure-sdk/public/_apis/build/status/go/Azure.go-amqp?branchName=master)](https://dev.azure.com/azure-sdk/public/_build/latest?definitionId=1292&branchName=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/Azure/go-amqp)](https://goreportcard.com/report/github.com/Azure/go-amqp)
[![GoDoc](https://godoc.org/github.com/Azure/go-amqp?status.svg)](http://godoc.org/github.com/Azure/go-amqp)
[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/Azure/go-amqp/master/LICENSE)
github.com/Azure/go-amqp is an AMQP 1.0 client implementation for Go.
[AMQP 1.0](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html) is not compatible with AMQP 0-9-1 or 0-10, which are
the most common AMQP protocols in use today. A list of AMQP 1.0 brokers and other
AMQP 1.0 resources can be found at [github.com/xinchen10/awesome-amqp](https://github.com/xinchen10/awesome-amqp).
This library aims to be stable and worthy of production usage, but the API is still subject to change. To conform with SemVer, the major version will remain 0 until the API is deemed stable. During this period breaking changes will be indicated by bumping the minor version. Non-breaking changes will bump the patch version.
## Install
```
go get -u github.com/Azure/go-amqp
```
## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md).
## Example Usage
``` go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
// Create client
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Send a message
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("/queue-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
sender.Close(ctx)
cancel()
}
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept(context.Background())
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
```
## Related Projects
| Project | Description |
|---------|-------------|
| [github.com/Azure/azure-event-hubs-go](https://github.com/Azure/azure-event-hubs-go) * | Library for interacting with Microsoft Azure Event Hubs. |
| [github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus](https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azservicebus) * | Library for interacting with Microsoft Azure Service Bus. |
| [gocloud.dev/pubsub](https://gocloud.dev/pubsub) * | Library for portably interacting with Pub/Sub systems. |
| [qpid-proton](https://github.com/apache/qpid-proton/tree/go1) | AMQP 1.0 library using the Qpid Proton C bindings. |
`*` indicates that the project uses this library.
Feel free to send PRs adding additional projects. Listed projects are not limited to those that use this library as long as they are potentially useful to people who are looking at an AMQP library.
# Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
<!-- BEGIN MICROSOFT SECURITY.MD V0.0.3 BLOCK -->
## Security
Microsoft takes the security of our software products and services seriously, which includes all source code repositories managed through our GitHub organizations, which include [Microsoft](https://github.com/Microsoft), [Azure](https://github.com/Azure), [DotNet](https://github.com/dotnet), [AspNet](https://github.com/aspnet), [Xamarin](https://github.com/xamarin), and [our GitHub organizations](https://opensource.microsoft.com/).
If you believe you have found a security vulnerability in any Microsoft-owned repository that meets Microsoft's [Microsoft's definition of a security vulnerability](https://docs.microsoft.com/en-us/previous-versions/tn-archive/cc751383(v=technet.10)) of a security vulnerability, please report it to us as described below.
## Reporting Security Issues
**Please do not report security vulnerabilities through public GitHub issues.**
Instead, please report them to the Microsoft Security Response Center (MSRC) at [https://msrc.microsoft.com/create-report](https://msrc.microsoft.com/create-report).
If you prefer to submit without logging in, send email to [secure@microsoft.com](mailto:secure@microsoft.com). If possible, encrypt your message with our PGP key; please download it from the the [Microsoft Security Response Center PGP Key page](https://www.microsoft.com/en-us/msrc/pgp-key-msrc).
You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Additional information can be found at [microsoft.com/msrc](https://www.microsoft.com/msrc).
Please include the requested information listed below (as much as you can provide) to help us better understand the nature and scope of the possible issue:
* Type of issue (e.g. buffer overflow, SQL injection, cross-site scripting, etc.)
* Full paths of source file(s) related to the manifestation of the issue
* The location of the affected source code (tag/branch/commit or direct URL)
* Any special configuration required to reproduce the issue
* Step-by-step instructions to reproduce the issue
* Proof-of-concept or exploit code (if possible)
* Impact of the issue, including how an attacker might exploit the issue
This information will help us triage your report more quickly.
If you are reporting for a bug bounty, more complete reports can contribute to a higher bounty award. Please visit our [Microsoft Bug Bounty Program](https://microsoft.com/msrc/bounty) page for more details about our active programs.
## Preferred Languages
We prefer all communications to be in English.
## Policy
Microsoft follows the principle of [Coordinated Vulnerability Disclosure](https://www.microsoft.com/en-us/msrc/cvd).
<!-- END MICROSOFT SECURITY.MD BLOCK -->
variables:
GO111MODULE: 'on'
AMQP_BROKER_ADDR: 'amqp://127.0.0.1:25672'
jobs:
- job: 'goamqp'
displayName: 'Run go-amqp CI Checks'
strategy:
matrix:
Linux_Go116:
vm.image: 'ubuntu-18.04'
go.version: '1.16.11'
Linux_Go117:
vm.image: 'ubuntu-18.04'
go.version: '1.17.4'
pool:
vmImage: '$(vm.image)'
steps:
- task: GoTool@0
inputs:
version: '$(go.version)'
displayName: "Select Go Version"
- script: |
set -e
export gopathbin=$(go env GOPATH)/bin
echo "##vso[task.prependpath]$gopathbin"
go get github.com/jstemmer/go-junit-report
go get github.com/axw/gocov/gocov
go get github.com/AlekSi/gocov-xml
go get github.com/matm/gocov-html
displayName: 'Install Dependencies'
- script: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.42.0
golangci-lint --version
golangci-lint run
displayName: 'Install and Run GoLintCLI.'
- script: |
go build -v ./...
displayName: 'Build'
- script: |
go vet ./...
displayName: 'Vet'
- task: UseDotNet@2
displayName: 'Use .NET sdk'
inputs:
packageType: sdk
version: 5.0.x
installationPath: $(Agent.ToolsDirectory)/dotnet
- script: |
git clone https://github.com/jhendrixMSFT/azure-amqp $(Pipeline.Workspace)/azure-amqp
pushd $(Pipeline.Workspace)/azure-amqp/test/TestAmqpBroker
dotnet restore
dotnet build
chmod +x $(Pipeline.Workspace)/azure-amqp/bin/Debug/TestAmqpBroker/net461/TestAmqpBroker.exe
displayName: 'Clone and Build Broker'
- script: |
set -e
echo '##[command]Starting broker at $(AMQP_BROKER_ADDR)'
$(Pipeline.Workspace)/azure-amqp/bin/Debug/TestAmqpBroker/net461/TestAmqpBroker.exe $AMQP_BROKER_ADDR /headless &
brokerPID=$!
echo '##[section]Starting tests'
go test -tags -race -v -coverprofile=coverage.txt -covermode atomic ./... 2>&1 | tee gotestoutput.log
go-junit-report < gotestoutput.log > report.xml
kill $brokerPID
gocov convert coverage.txt > coverage.json
gocov-xml < coverage.json > coverage.xml
gocov-html < coverage.json > coverage.html
displayName: 'Run Tests'
- script: |
gofmt -s -l -w . >&2
displayName: 'Format Check'
failOnStderr: true
condition: succeededOrFailed()
- task: PublishTestResults@2
inputs:
testRunner: JUnit
testResultsFiles: report.xml
failTaskOnFailedTests: true
- task: PublishCodeCoverageResults@1
inputs:
codeCoverageTool: Cobertura
summaryFileLocation: coverage.xml
additionalCodeCoverageFiles: coverage.html
package amqp
import (
"encoding/base64"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Client is an AMQP client connection.
type Client struct {
conn *conn
}
// Dial connects to an AMQP server.
//
// If the addr includes a scheme, it must be "amqp" or "amqps".
// If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps".
//
// If username and password information is not empty it's used as SASL PLAIN
// credentials, equal to passing ConnSASLPlain option.
func Dial(addr string, opts ...ConnOption) (*Client, error) {
c, err := dialConn(addr, opts...)
if err != nil {
return nil, err
}
err = c.Start()
if err != nil {
return nil, err
}
return &Client{conn: c}, nil
}
// New establishes an AMQP client connection over conn.
func New(conn net.Conn, opts ...ConnOption) (*Client, error) {
c, err := newConn(conn, opts...)
if err != nil {
return nil, err
}
err = c.Start()
return &Client{conn: c}, err
}
// Close disconnects the connection.
func (c *Client) Close() error {
return c.conn.Close()
}
// NewSession opens a new AMQP session to the server.
// Returns ErrConnClosed if the underlying connection has been closed.
func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
// get a session allocated by Client.mux
var sResp newSessionResp
select {
case <-c.conn.Done:
return nil, c.conn.Err()
case sResp = <-c.conn.NewSession:
}
if sResp.err != nil {
return nil, sResp.err
}
s := sResp.session
for _, opt := range opts {
err := opt(s)
if err != nil {
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
c.conn.DelSession <- s
return nil, err
}
}
// send Begin to server
begin := &frames.PerformBegin{
NextOutgoingID: 0,
IncomingWindow: s.incomingWindow,
OutgoingWindow: s.outgoingWindow,
HandleMax: s.handleMax,
}
debug(1, "TX (NewSession): %s", begin)
_ = s.txFrame(begin, nil)
// wait for response
var fr frames.Frame
select {
case <-c.conn.Done:
return nil, c.conn.Err()
case fr = <-s.rx:
}
debug(1, "RX (NewSession): %s", fr.Body)
begin, ok := fr.Body.(*frames.PerformBegin)
if !ok {
// this codepath is hard to hit (impossible?). if the response isn't a PerformBegin and we've not
// yet seen the remote channel number, the default clause in conn.mux will protect us from that.
// if we have seen the remote channel number then it's likely the session.mux for that channel will
// either swallow the frame or blow up in some other way, both causing this call to hang.
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
c.conn.DelSession <- s
return nil, fmt.Errorf("unexpected begin response: %+v", fr.Body)
}
// start Session multiplexor
go s.mux(begin)
return s, nil
}
// Default session options
const (
DefaultMaxLinks = 4294967296
DefaultWindow = 1000
)
// SessionOption is an function for configuring an AMQP session.
type SessionOption func(*Session) error
// SessionIncomingWindow sets the maximum number of unacknowledged
// transfer frames the server can send.
func SessionIncomingWindow(window uint32) SessionOption {
return func(s *Session) error {
s.incomingWindow = window
return nil
}
}
// SessionOutgoingWindow sets the maximum number of unacknowledged
// transfer frames the client can send.
func SessionOutgoingWindow(window uint32) SessionOption {
return func(s *Session) error {
s.outgoingWindow = window
return nil
}
}
// SessionMaxLinks sets the maximum number of links (Senders/Receivers)
// allowed on the session.
//
// n must be in the range 1 to 4294967296.
//
// Default: 4294967296.
func SessionMaxLinks(n int) SessionOption {
return func(s *Session) error {
if n < 1 {
return errors.New("max sessions cannot be less than 1")
}
if int64(n) > 4294967296 {
return errors.New("max sessions cannot be greater than 4294967296")
}
s.handleMax = uint32(n - 1)
return nil
}
}
// lockedRand provides a rand source that is safe for concurrent use.
type lockedRand struct {
mu sync.Mutex
src *rand.Rand
}
func (r *lockedRand) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.src.Read(p)
}
// package scoped rand source to avoid any issues with seeding
// of the global source.
var pkgRand = &lockedRand{
src: rand.New(rand.NewSource(time.Now().UnixNano())),
}
// randBytes returns a base64 encoded string of n bytes.
func randString(n int) string {
b := make([]byte, n)
// from math/rand, cannot fail
_, _ = pkgRand.Read(b)
return base64.RawURLEncoding.EncodeToString(b)
}
// linkKey uniquely identifies a link on a connection by name and direction.
//
// A link can be identified uniquely by the ordered tuple
// (source-container-id, target-container-id, name)
// On a single connection the container ID pairs can be abbreviated
// to a boolean flag indicating the direction of the link.
type linkKey struct {
name string
role encoding.Role // Local role: sender/receiver
}
const maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment