Skip to content
Snippets Groups Projects
Commit bfe6960e authored by Steffen Schulze's avatar Steffen Schulze
Browse files

Merge branch 'main' into 'main'

Add QueueGroup and change configuration structure

See merge request eclipse/xfsc/libraries/messaging/cloudeventprovider!4
parents 595a846f a64a8f66
No related branches found
No related tags found
No related merge requests found
Showing
with 2029 additions and 585 deletions
......@@ -14,18 +14,19 @@ import "gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudeventprovider"
## Configure protocol and corresponding config with yaml File
```yaml
protocol: nats
nats:
url: http://localhost:4222
queueGroup: logger #optional
timeoutInSec: 10 #optional
messaging:
protocol: nats
nats:
url: http://localhost:4222
queueGroup: logger #optional
timeoutInSec: 10 #optional
```
## Send an Cloudevent
```go
func main() {
topic := "events"
c, err := cloudeventprovider.CreateClient(cloudeventprovider.Pub, topic)
c, err := cloudeventprovider.NewClient(cloudeventprovider.Pub, topic)
if err != nil {
log.Fatal(err)
}
......@@ -45,13 +46,13 @@ func main() {
## Receive an Cloudevent
```go
func receive(event cloudevents.Event) {
func receive(event event.Event) {
fmt.Printf("%s", event)
}
func main() {
topic := "events"
client, err := cloudeventprovider.CreateClient(cloudeventprovider.Sub, topic)
client, err := cloudeventprovider.NewClient(cloudeventprovider.Sub, topic)
if err != nil {
log.Fatal(err)
}
......
......@@ -22,14 +22,14 @@ type CloudEventProviderClient struct {
// newClient ignores topic parameter for http connections
func newClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
protocol := currentCloudEventConfig.Protocol
protocol := currentCloudEventConfig.Messaging.Protocol
ctx := context.Background()
var connection interface{}
var err error
if protocol == Http {
httpConfig := currentCloudEventConfig.Http
httpConfig := currentCloudEventConfig.Messaging.Http
switch connectionType {
case Pub, Req:
......
......@@ -18,47 +18,52 @@ func init() {
}
type cloudEventProviderConfiguration struct {
Protocol ProtocolType `mapstructure:"protocol"`
Nats struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"nats"`
NatsJetstream struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
StreamType string `mapstructure:"streamType"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"natsJetstream"`
Kafka struct {
Url string `mapstructure:"url"`
GroupId string `mapstructure:"groupId,omitempty"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"kafka"`
Mqtt struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"mqtt"`
Ampq struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"amqp"`
Http struct {
Url string `mapstructure:"url"`
Port int `mapstructure:"port"`
Path string `mapstructure:"path"`
} `mapstructure:"http"`
Messaging struct {
Protocol ProtocolType `mapstructure:"protocol"`
Nats struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"nats"`
NatsJetstream struct {
Url string `mapstructure:"url"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
StreamType string `mapstructure:"streamType"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"natsJetstream"`
Kafka struct {
Url string `mapstructure:"url"`
GroupId string `mapstructure:"groupId,omitempty"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"kafka"`
Mqtt struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"mqtt"`
Ampq struct {
Url string `mapstructure:"url"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"amqp"`
Http struct {
Url string `mapstructure:"url"`
Port int `mapstructure:"port"`
Path string `mapstructure:"path"`
} `mapstructure:"http"`
} `mapstructure:"messaging"`
}
var currentCloudEventConfig cloudEventProviderConfiguration
func loadConfig() error {
if err := bindEnvs(); err != nil {
return err
}
readConfig()
if err := cloudEventProviderViper.Unmarshal(&currentCloudEventConfig); err != nil {
......@@ -71,9 +76,8 @@ func loadConfig() error {
}
func checkConfig() error {
if cloudEventProviderViper.IsSet("protocol") {
protocol := cloudEventProviderViper.GetString("protocol")
err := checkIfProtocolConfigIsSet(protocol)
if cloudEventProviderViper.IsSet("messaging.protocol") {
err := checkIfProtocolConfigIsSet(currentCloudEventConfig.Messaging.Protocol)
if err != nil {
return err
}
......@@ -83,45 +87,76 @@ func checkConfig() error {
}
}
func checkIfProtocolConfigIsSet(protocol string) error {
func checkIfProtocolConfigIsSet(protocol ProtocolType) error {
//TODO: loop trough config -> no need for switch
var err error
switch protocol {
case string(Http):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Http)
case string(Nats):
case Http:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Http)
case Nats:
//TODO: check for subject name conventions https://docs.nats.io/nats-concepts/subjects
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Nats)
case string(NatsJetstream):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.NatsJetstream)
case string(Kafka):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Kafka)
case string(Mqtt):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Mqtt)
case string(Amqp):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Ampq)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Nats)
case NatsJetstream:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.NatsJetstream)
case Kafka:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Kafka)
case Mqtt:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Mqtt)
case Amqp:
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Messaging.Ampq)
default:
err = fmt.Errorf("protocol %s is not supported", protocol)
}
return err
}
func checkIfAllConfigKeysAreSet(protocol string, config interface{}) error {
func checkIfAllConfigKeysAreSet(protocol ProtocolType, config interface{}) error {
var configMap map[string]interface{}
if err := mapstructure.Decode(config, &configMap); err != nil {
return fmt.Errorf("could not decode config to map for checking config: %w", err)
}
for configKey := range configMap {
if !cloudEventProviderViper.IsSet(protocol + "." + configKey) {
if !cloudEventProviderViper.IsSet("messaging." + string(protocol) + "." + configKey) {
return fmt.Errorf("%w: missing configKey %s for protocol %s", ErrConfigKeyMissing, configKey, protocol)
}
}
return nil
}
func bindEnvs() error {
envs := []string{
"messaging.protocol",
"messaging.nats.url",
"messaging.nats.queueGroup",
"messaging.nats.timeOutInSec",
"messaging.natsJetstream.url",
"messaging.natsJetstream.queueGroup",
"messaging.natsJetstream.streamType",
"messaging.natsJetstream.timeOutInSec",
"messaging.kafka.url",
"messaging.kafka.groupId",
"messaging.kafka.clientId",
"messaging.mqtt.url",
"messaging.mqtt.clientId",
"messaging.ampq.url",
"messaging.ampq.clientId",
"messaging.http.url",
"messaging.http.port",
"messaging.http.path",
}
for _, env := range envs {
err := cloudEventProviderViper.BindEnv(env)
if err != nil {
return fmt.Errorf("could not bind env %s: %w", env, err)
}
}
return nil
}
func readConfig() {
cloudEventProviderViper.SetConfigName("config")
cloudEventProviderViper.SetConfigType("yaml")
......@@ -134,7 +169,7 @@ func readConfig() {
var configFileNotFoundError viper.ConfigFileNotFoundError
if errors.As(err, &configFileNotFoundError) {
log.Printf("Configuration not found but environment variables will be taken into account.")
cloudEventProviderViper.AutomaticEnv()
}
cloudEventProviderViper.AutomaticEnv()
}
}
......@@ -7,10 +7,10 @@ import (
func TestOptionalConfig(t *testing.T) {
// optional config timeoutInSec is missing
t.Setenv("CLOUDEVENTPROVIDER_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVENTPROVIDER_NATS_QUEUEGROUP", "test")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_QUEUEGROUP", "test")
err := loadConfig()
if err != nil {
......@@ -20,8 +20,8 @@ func TestOptionalConfig(t *testing.T) {
func TestMissingConfig(t *testing.T) {
// subject config is missing
t.Setenv("CLOUDEVENTPROVIDER_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_MESSAGING_NATS_URL", "http://localhost:4222")
err := loadConfig()
if !errors.Is(errors.Unwrap(err), ErrConfigKeyMissing) {
......
......@@ -3,21 +3,21 @@ package cloudeventprovider
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/nats-io/nats.go"
"log"
"net"
"net/url"
"strings"
"time"
"github.com/Azure/go-amqp"
"github.com/Shopify/sarama"
ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
mqttPaho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/eclipse/paho.golang/paho"
"github.com/nats-io/nats.go"
)
type cloudEventConnection interface {
......@@ -61,7 +61,7 @@ func newCloudEventConnection(protocolType ProtocolType, connectionType Connectio
}
func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
kafkaConfig := currentCloudEventConfig.Kafka
kafkaConfig := currentCloudEventConfig.Messaging.Kafka
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
......@@ -84,7 +84,7 @@ func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEvent
}
func newNatsConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsConfig := currentCloudEventConfig.Nats
natsConfig := currentCloudEventConfig.Messaging.Nats
var natsOptions []nats.Option
if natsConfig.TimeoutInSec != 0*time.Second {
natsOptions = append(natsOptions, nats.Timeout(natsConfig.TimeoutInSec*time.Second))
......@@ -115,7 +115,7 @@ func newNatsConnection(connectionType ConnectionType, topic string) (cloudEventC
}
return requester, nil
case Rep:
respondConsumer, err := newNatsRespondConsumer(natsConfig.Url, topic, natsOptions...)
respondConsumer, err := newNatsRespondConsumer(natsConfig.Url, topic, natsConfig.QueueGroup, natsOptions...)
if err != nil {
return nil, err
}
......@@ -126,7 +126,7 @@ func newNatsConnection(connectionType ConnectionType, topic string) (cloudEventC
}
func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsJetstreamConfig := currentCloudEventConfig.NatsJetstream
natsJetstreamConfig := currentCloudEventConfig.Messaging.NatsJetstream
var natsJetstreamOptions []nats.Option
if natsJetstreamConfig.TimeoutInSec != 0*time.Second {
natsJetstreamOptions = append(natsJetstreamOptions, nats.Timeout(natsJetstreamConfig.TimeoutInSec*time.Second))
......@@ -156,7 +156,7 @@ func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cl
}
func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
mqttConfig := currentCloudEventConfig.Mqtt
mqttConfig := currentCloudEventConfig.Messaging.Mqtt
ctx := context.Background()
conn, err := net.Dial("tcp", mqttConfig.Url)
......@@ -187,8 +187,11 @@ func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventC
Conn: conn,
}
subscribeOpt := &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
topic: {QoS: 0},
Subscriptions: []paho.SubscribeOptions{
{
Topic: topic,
QoS: 0,
},
},
}
......@@ -223,7 +226,7 @@ func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEventC
func parseAmqpConfig() (amqpUrl, node string, opts []ceamqp.Option) {
// TODO: authentication over URL is not safe!
amqpUrl = currentCloudEventConfig.Ampq.Url
amqpUrl = currentCloudEventConfig.Messaging.Ampq.Url
parsedUrl, err := url.Parse(amqpUrl)
if err != nil {
log.Fatal(err)
......
module gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudeventprovider
go 1.20
go 1.21.4
require (
github.com/Shopify/sarama v1.38.1
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20240112101907-998919d6876d
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.14.0
github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.14.0
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/google/uuid v1.4.0
github.com/eclipse/paho.golang v0.12.0
github.com/google/uuid v1.5.0
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats.go v1.32.0
github.com/spf13/viper v1.18.2
)
require (
github.com/Azure/go-amqp v0.17.0 // indirect
github.com/IBM/sarama v1.42.0 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 // indirect
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.14.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/eclipse/paho.golang v0.11.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
......@@ -31,37 +32,34 @@ require (
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats.go v1.30.2 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.17.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
This diff is collapsed.
......@@ -36,12 +36,13 @@ type natsRespondConsumer struct {
natsReceiver
Conn *nats.Conn
Subject string
QueueGroup string
subMtx sync.Mutex
internalClose chan int
connOwned bool
}
func newNatsRespondConsumer(url string, subject string, natsOptions ...nats.Option) (*natsRespondConsumer, error) {
func newNatsRespondConsumer(url string, subject string, queueGroup string, natsOptions ...nats.Option) (*natsRespondConsumer, error) {
conn, err := nats.Connect(url, natsOptions...)
if err != nil {
return nil, err
......@@ -51,6 +52,7 @@ func newNatsRespondConsumer(url string, subject string, natsOptions ...nats.Opti
natsReceiver: *newNatsReceiver(),
Conn: conn,
Subject: subject,
QueueGroup: queueGroup,
internalClose: make(chan int),
connOwned: true,
}
......@@ -63,7 +65,14 @@ func (c *natsRespondConsumer) Reply(ctx context.Context, responseFunction func(c
defer c.subMtx.Unlock()
// Subscribe
sub, err := c.Conn.Subscribe(c.Subject, c.MsgHandler)
var sub *nats.Subscription
var err error
if c.QueueGroup != "" {
sub, err = c.Conn.QueueSubscribe(c.Subject, c.QueueGroup, c.MsgHandler)
} else {
sub, err = c.Conn.Subscribe(c.Subject, c.MsgHandler)
}
if err != nil {
return err
}
......
# Binary files (no line-ending conversions), diff using hexdump
*.bin binary diff=hex
amqp.test
/fuzz/*/*
!/fuzz/*/corpus
/fuzz/*.zip
*.log
/cmd
cover.out
.envrc
recordings
.vscode
.idea
*.env
\ No newline at end of file
# Microsoft Open Source Code of Conduct
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
Resources:
- [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/)
- [Microsoft Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/)
- Contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with questions or concerns
# Contributing
This repo is no longer under active development. See [issue #205](https://github.com/vcabbage/amqp/issues/205) for details.
~~Whether it's code, documentation, and/or example, all contributions are appreciated.~~
~~To ensure a smooth process, here are some guidelines and expectations:~~
* ~~An issue should be created discussing any non-trivial change. Small changes, such as a fixing a typo, don't need an issue.~~
* ~~Ideally, an issue should describe both the problem to be solved and a proposed solution.~~
* ~~Please indicate that you want to work on the change in the issue. If you change your mind about working on an issue you are always free to back out. There will be no hard feelings.~~
* ~~Depending on the scope, there may be some back and forth about the problem and solution. This is intended to be a collaborative discussion to ensure the problem is adequately solved in a manner that fits well in the library.~~
~~Once you're ready to open a PR:~~
* ~~Ensure code is formatted with `gofmt`.~~
* ~~You may also want to peruse https://github.com/golang/go/wiki/CodeReviewComments and check that code conforms to the recommendations.~~
* ~~Tests are appreciated, but not required. The integration tests are currently specific to Microsoft Azure and require a number of credentials provided via environment variables. This can be a high barrier if you don't already have setup that works with the tests.~~
* ~~When you open the PR CI will run unit tests. Integration tests will be run manually as part of the review.~~
* ~~All PRs will be merged as a single commit. If your PR includes multiple commits they will be squashed together before merging. This usually isn't a big deal, but if you have any questions feel free to ask.~~
~~I do my best to respond to issues and PRs in a timely fashion. If it's been a couple days without a response or if it seems like I've overlooked something, feel free to ping me.~~
## Debugging
### Logging
To enable debug logging, build with `-tags debug`. This enables debug level 1 by default. You can increase the level by setting the `DEBUG_LEVEL` environment variable to 2 or higher. (Debug logging is disabled entirely without `-tags debug`, regardless of `DEBUG_LEVEL` setting.)
To add additional logging, use the `debug(level int, format string, v ...interface{})` function, which is similar to `fmt.Printf` but takes a level as it's first argument.
### Packet Capture
Wireshark can be very helpful in diagnosing interactions between client and server. If the connection is not encrypted Wireshark can natively decode AMQP 1.0. If the connection is encrypted with TLS you'll need to log out the keys.
Example of logging the TLS keys:
```go
// Create the file
f, err := os.OpenFile("key.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
// Configure TLS
tlsConfig := &tls.Config{
KeyLogWriter: f,
}
// Dial the host
const host = "my.amqp.server"
conn, err := tls.Dial("tcp", host+":5671", tlsConfig)
// Create the connections
client, err := amqp.New(conn,
amqp.ConnSASLPlain("username", "password"),
amqp.ConnServerHostname(host),
)
```
You'll need to configure Wireshark to read the key.log file in Preferences > Protocols > SSL > (Pre)-Master-Secret log filename.
MIT License
Copyright (C) 2017 Kale Blankenship
Portions Copyright (C) Microsoft Corporation
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
PACKAGE := github.com/Azure/go-amqp
FUZZ_DIR := ./fuzz
all: test
fuzzconn:
go-fuzz-build -o $(FUZZ_DIR)/conn.zip -func FuzzConn $(PACKAGE)
go-fuzz -bin $(FUZZ_DIR)/conn.zip -workdir $(FUZZ_DIR)/conn
fuzzmarshal:
go-fuzz-build -o $(FUZZ_DIR)/marshal.zip -func FuzzUnmarshal $(PACKAGE)
go-fuzz -bin $(FUZZ_DIR)/marshal.zip -workdir $(FUZZ_DIR)/marshal
fuzzclean:
rm -f $(FUZZ_DIR)/**/{crashers,suppressions}/*
rm -f $(FUZZ_DIR)/*.zip
test:
TEST_CORPUS=1 go test -race -run=Corpus
go test -v -race ./...
#integration:
#go test -tags "integration" -count=1 -v -race .
test386:
TEST_CORPUS=1 go test -count=1 -v .
ci: test386 coverage
coverage:
TEST_CORPUS=1 go test -cover -coverprofile=cover.out -v
# **github.com/Azure/go-amqp**
[![Build Status](https://dev.azure.com/azure-sdk/public/_apis/build/status/go/Azure.go-amqp?branchName=master)](https://dev.azure.com/azure-sdk/public/_build/latest?definitionId=1292&branchName=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/Azure/go-amqp)](https://goreportcard.com/report/github.com/Azure/go-amqp)
[![GoDoc](https://godoc.org/github.com/Azure/go-amqp?status.svg)](http://godoc.org/github.com/Azure/go-amqp)
[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/Azure/go-amqp/master/LICENSE)
github.com/Azure/go-amqp is an AMQP 1.0 client implementation for Go.
[AMQP 1.0](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html) is not compatible with AMQP 0-9-1 or 0-10, which are
the most common AMQP protocols in use today. A list of AMQP 1.0 brokers and other
AMQP 1.0 resources can be found at [github.com/xinchen10/awesome-amqp](https://github.com/xinchen10/awesome-amqp).
This library aims to be stable and worthy of production usage, but the API is still subject to change. To conform with SemVer, the major version will remain 0 until the API is deemed stable. During this period breaking changes will be indicated by bumping the minor version. Non-breaking changes will bump the patch version.
## Install
```
go get -u github.com/Azure/go-amqp
```
## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md).
## Example Usage
``` go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
// Create client
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Send a message
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("/queue-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
err = sender.Send(ctx, amqp.NewMessage([]byte("Hello!")))
if err != nil {
log.Fatal("Sending message:", err)
}
sender.Close(ctx)
cancel()
}
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept(context.Background())
fmt.Printf("Message received: %s\n", msg.GetData())
}
}
}
```
## Related Projects
| Project | Description |
|---------|-------------|
| [github.com/Azure/azure-event-hubs-go](https://github.com/Azure/azure-event-hubs-go) * | Library for interacting with Microsoft Azure Event Hubs. |
| [github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus](https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azservicebus) * | Library for interacting with Microsoft Azure Service Bus. |
| [gocloud.dev/pubsub](https://gocloud.dev/pubsub) * | Library for portably interacting with Pub/Sub systems. |
| [qpid-proton](https://github.com/apache/qpid-proton/tree/go1) | AMQP 1.0 library using the Qpid Proton C bindings. |
`*` indicates that the project uses this library.
Feel free to send PRs adding additional projects. Listed projects are not limited to those that use this library as long as they are potentially useful to people who are looking at an AMQP library.
# Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
<!-- BEGIN MICROSOFT SECURITY.MD V0.0.3 BLOCK -->
## Security
Microsoft takes the security of our software products and services seriously, which includes all source code repositories managed through our GitHub organizations, which include [Microsoft](https://github.com/Microsoft), [Azure](https://github.com/Azure), [DotNet](https://github.com/dotnet), [AspNet](https://github.com/aspnet), [Xamarin](https://github.com/xamarin), and [our GitHub organizations](https://opensource.microsoft.com/).
If you believe you have found a security vulnerability in any Microsoft-owned repository that meets Microsoft's [Microsoft's definition of a security vulnerability](https://docs.microsoft.com/en-us/previous-versions/tn-archive/cc751383(v=technet.10)) of a security vulnerability, please report it to us as described below.
## Reporting Security Issues
**Please do not report security vulnerabilities through public GitHub issues.**
Instead, please report them to the Microsoft Security Response Center (MSRC) at [https://msrc.microsoft.com/create-report](https://msrc.microsoft.com/create-report).
If you prefer to submit without logging in, send email to [secure@microsoft.com](mailto:secure@microsoft.com). If possible, encrypt your message with our PGP key; please download it from the the [Microsoft Security Response Center PGP Key page](https://www.microsoft.com/en-us/msrc/pgp-key-msrc).
You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Additional information can be found at [microsoft.com/msrc](https://www.microsoft.com/msrc).
Please include the requested information listed below (as much as you can provide) to help us better understand the nature and scope of the possible issue:
* Type of issue (e.g. buffer overflow, SQL injection, cross-site scripting, etc.)
* Full paths of source file(s) related to the manifestation of the issue
* The location of the affected source code (tag/branch/commit or direct URL)
* Any special configuration required to reproduce the issue
* Step-by-step instructions to reproduce the issue
* Proof-of-concept or exploit code (if possible)
* Impact of the issue, including how an attacker might exploit the issue
This information will help us triage your report more quickly.
If you are reporting for a bug bounty, more complete reports can contribute to a higher bounty award. Please visit our [Microsoft Bug Bounty Program](https://microsoft.com/msrc/bounty) page for more details about our active programs.
## Preferred Languages
We prefer all communications to be in English.
## Policy
Microsoft follows the principle of [Coordinated Vulnerability Disclosure](https://www.microsoft.com/en-us/msrc/cvd).
<!-- END MICROSOFT SECURITY.MD BLOCK -->
variables:
GO111MODULE: 'on'
AMQP_BROKER_ADDR: 'amqp://127.0.0.1:25672'
jobs:
- job: 'goamqp'
displayName: 'Run go-amqp CI Checks'
strategy:
matrix:
Linux_Go116:
vm.image: 'ubuntu-18.04'
go.version: '1.16.11'
Linux_Go117:
vm.image: 'ubuntu-18.04'
go.version: '1.17.4'
pool:
vmImage: '$(vm.image)'
steps:
- task: GoTool@0
inputs:
version: '$(go.version)'
displayName: "Select Go Version"
- script: |
set -e
export gopathbin=$(go env GOPATH)/bin
echo "##vso[task.prependpath]$gopathbin"
go get github.com/jstemmer/go-junit-report
go get github.com/axw/gocov/gocov
go get github.com/AlekSi/gocov-xml
go get github.com/matm/gocov-html
displayName: 'Install Dependencies'
- script: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.42.0
golangci-lint --version
golangci-lint run
displayName: 'Install and Run GoLintCLI.'
- script: |
go build -v ./...
displayName: 'Build'
- script: |
go vet ./...
displayName: 'Vet'
- task: UseDotNet@2
displayName: 'Use .NET sdk'
inputs:
packageType: sdk
version: 5.0.x
installationPath: $(Agent.ToolsDirectory)/dotnet
- script: |
git clone https://github.com/jhendrixMSFT/azure-amqp $(Pipeline.Workspace)/azure-amqp
pushd $(Pipeline.Workspace)/azure-amqp/test/TestAmqpBroker
dotnet restore
dotnet build
chmod +x $(Pipeline.Workspace)/azure-amqp/bin/Debug/TestAmqpBroker/net461/TestAmqpBroker.exe
displayName: 'Clone and Build Broker'
- script: |
set -e
echo '##[command]Starting broker at $(AMQP_BROKER_ADDR)'
$(Pipeline.Workspace)/azure-amqp/bin/Debug/TestAmqpBroker/net461/TestAmqpBroker.exe $AMQP_BROKER_ADDR /headless &
brokerPID=$!
echo '##[section]Starting tests'
go test -tags -race -v -coverprofile=coverage.txt -covermode atomic ./... 2>&1 | tee gotestoutput.log
go-junit-report < gotestoutput.log > report.xml
kill $brokerPID
gocov convert coverage.txt > coverage.json
gocov-xml < coverage.json > coverage.xml
gocov-html < coverage.json > coverage.html
displayName: 'Run Tests'
- script: |
gofmt -s -l -w . >&2
displayName: 'Format Check'
failOnStderr: true
condition: succeededOrFailed()
- task: PublishTestResults@2
inputs:
testRunner: JUnit
testResultsFiles: report.xml
failTaskOnFailedTests: true
- task: PublishCodeCoverageResults@1
inputs:
codeCoverageTool: Cobertura
summaryFileLocation: coverage.xml
additionalCodeCoverageFiles: coverage.html
package amqp
import (
"encoding/base64"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Client is an AMQP client connection.
type Client struct {
conn *conn
}
// Dial connects to an AMQP server.
//
// If the addr includes a scheme, it must be "amqp" or "amqps".
// If no port is provided, 5672 will be used for "amqp" and 5671 for "amqps".
//
// If username and password information is not empty it's used as SASL PLAIN
// credentials, equal to passing ConnSASLPlain option.
func Dial(addr string, opts ...ConnOption) (*Client, error) {
c, err := dialConn(addr, opts...)
if err != nil {
return nil, err
}
err = c.Start()
if err != nil {
return nil, err
}
return &Client{conn: c}, nil
}
// New establishes an AMQP client connection over conn.
func New(conn net.Conn, opts ...ConnOption) (*Client, error) {
c, err := newConn(conn, opts...)
if err != nil {
return nil, err
}
err = c.Start()
return &Client{conn: c}, err
}
// Close disconnects the connection.
func (c *Client) Close() error {
return c.conn.Close()
}
// NewSession opens a new AMQP session to the server.
// Returns ErrConnClosed if the underlying connection has been closed.
func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
// get a session allocated by Client.mux
var sResp newSessionResp
select {
case <-c.conn.Done:
return nil, c.conn.Err()
case sResp = <-c.conn.NewSession:
}
if sResp.err != nil {
return nil, sResp.err
}
s := sResp.session
for _, opt := range opts {
err := opt(s)
if err != nil {
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
c.conn.DelSession <- s
return nil, err
}
}
// send Begin to server
begin := &frames.PerformBegin{
NextOutgoingID: 0,
IncomingWindow: s.incomingWindow,
OutgoingWindow: s.outgoingWindow,
HandleMax: s.handleMax,
}
debug(1, "TX (NewSession): %s", begin)
_ = s.txFrame(begin, nil)
// wait for response
var fr frames.Frame
select {
case <-c.conn.Done:
return nil, c.conn.Err()
case fr = <-s.rx:
}
debug(1, "RX (NewSession): %s", fr.Body)
begin, ok := fr.Body.(*frames.PerformBegin)
if !ok {
// this codepath is hard to hit (impossible?). if the response isn't a PerformBegin and we've not
// yet seen the remote channel number, the default clause in conn.mux will protect us from that.
// if we have seen the remote channel number then it's likely the session.mux for that channel will
// either swallow the frame or blow up in some other way, both causing this call to hang.
// deallocate session on error. we can't call
// s.Close() as the session mux hasn't started yet.
c.conn.DelSession <- s
return nil, fmt.Errorf("unexpected begin response: %+v", fr.Body)
}
// start Session multiplexor
go s.mux(begin)
return s, nil
}
// Default session options
const (
DefaultMaxLinks = 4294967296
DefaultWindow = 1000
)
// SessionOption is an function for configuring an AMQP session.
type SessionOption func(*Session) error
// SessionIncomingWindow sets the maximum number of unacknowledged
// transfer frames the server can send.
func SessionIncomingWindow(window uint32) SessionOption {
return func(s *Session) error {
s.incomingWindow = window
return nil
}
}
// SessionOutgoingWindow sets the maximum number of unacknowledged
// transfer frames the client can send.
func SessionOutgoingWindow(window uint32) SessionOption {
return func(s *Session) error {
s.outgoingWindow = window
return nil
}
}
// SessionMaxLinks sets the maximum number of links (Senders/Receivers)
// allowed on the session.
//
// n must be in the range 1 to 4294967296.
//
// Default: 4294967296.
func SessionMaxLinks(n int) SessionOption {
return func(s *Session) error {
if n < 1 {
return errors.New("max sessions cannot be less than 1")
}
if int64(n) > 4294967296 {
return errors.New("max sessions cannot be greater than 4294967296")
}
s.handleMax = uint32(n - 1)
return nil
}
}
// lockedRand provides a rand source that is safe for concurrent use.
type lockedRand struct {
mu sync.Mutex
src *rand.Rand
}
func (r *lockedRand) Read(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.src.Read(p)
}
// package scoped rand source to avoid any issues with seeding
// of the global source.
var pkgRand = &lockedRand{
src: rand.New(rand.NewSource(time.Now().UnixNano())),
}
// randBytes returns a base64 encoded string of n bytes.
func randString(n int) string {
b := make([]byte, n)
// from math/rand, cannot fail
_, _ = pkgRand.Read(b)
return base64.RawURLEncoding.EncodeToString(b)
}
// linkKey uniquely identifies a link on a connection by name and direction.
//
// A link can be identified uniquely by the ordered tuple
// (source-container-id, target-container-id, name)
// On a single connection the container ID pairs can be abbreviated
// to a boolean flag indicating the direction of the link.
type linkKey struct {
name string
role encoding.Role // Local role: sender/receiver
}
const maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader
This diff is collapsed.
package amqp
import "github.com/Azure/go-amqp/internal/encoding"
// Sender Settlement Modes
const (
// Sender will send all deliveries initially unsettled to the receiver.
ModeUnsettled = encoding.ModeUnsettled
// Sender will send all deliveries settled to the receiver.
ModeSettled = encoding.ModeSettled
// Sender MAY send a mixture of settled and unsettled deliveries to the receiver.
ModeMixed = encoding.ModeMixed
)
// SenderSettleMode specifies how the sender will settle messages.
type SenderSettleMode = encoding.SenderSettleMode
func senderSettleModeValue(m *SenderSettleMode) SenderSettleMode {
if m == nil {
return ModeMixed
}
return *m
}
// Receiver Settlement Modes
const (
// Receiver will spontaneously settle all incoming transfers.
ModeFirst = encoding.ModeFirst
// Receiver will only settle after sending the disposition to the
// sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ModeSecond = encoding.ModeSecond
)
// ReceiverSettleMode specifies how the receiver will settle messages.
type ReceiverSettleMode = encoding.ReceiverSettleMode
func receiverSettleModeValue(m *ReceiverSettleMode) ReceiverSettleMode {
if m == nil {
return ModeFirst
}
return *m
}
// Durability Policies
const (
// No terminus state is retained durably.
DurabilityNone = encoding.DurabilityNone
// Only the existence and configuration of the terminus is
// retained durably.
DurabilityConfiguration = encoding.DurabilityConfiguration
// In addition to the existence and configuration of the
// terminus, the unsettled state for durable messages is
// retained durably.
DurabilityUnsettledState = encoding.DurabilityUnsettledState
)
// Durability specifies the durability of a link.
type Durability = encoding.Durability
// Expiry Policies
const (
// The expiry timer starts when terminus is detached.
ExpiryLinkDetach = encoding.ExpiryLinkDetach
// The expiry timer starts when the most recently
// associated session is ended.
ExpirySessionEnd = encoding.ExpirySessionEnd
// The expiry timer starts when most recently associated
// connection is closed.
ExpiryConnectionClose = encoding.ExpiryConnectionClose
// The terminus never expires.
ExpiryNever = encoding.ExpiryNever
)
// ExpiryPolicy specifies when the expiry timer of a terminus
// starts counting down from the timeout value.
//
// If the link is subsequently re-attached before the terminus is expired,
// then the count down is aborted. If the conditions for the
// terminus-expiry-policy are subsequently re-met, the expiry timer restarts
// from its originally configured timeout value.
type ExpiryPolicy = encoding.ExpiryPolicy
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment