Skip to content
Snippets Groups Projects
Commit 3439b92c authored by Manuel Heß's avatar Manuel Heß
Browse files

add new config structure to differentiate between protocols

parent f1f24427
No related branches found
No related tags found
No related merge requests found
......@@ -7,26 +7,52 @@ import (
"github.com/cloudevents/sdk-go/v2/event"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/config"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/connection"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
)
type CloudEvtClient struct {
context context.Context
protocol model.ProtocolType
conn connection.CloudEvtConnection
connectionClient client.Client
connectionType connection.ConnectionType
connectionType model.ConnectionType
alive bool
}
func NewClient(connectionType connection.ConnectionType) (*CloudEvtClient, error) {
conn, err := connection.NewCloudEvtConnection(config.CurrentCloudEvtConfig.ProtocolType, connectionType)
func NewClient(connectionType model.ConnectionType) (*CloudEvtClient, error) {
protocol := config.CurrentCloudEvtConfig.Protocol
connectionClient, err := cloudevents.NewClient(conn)
var conn connection.CloudEvtConnection
var connectionClient client.Client
ctx := context.Background()
var err error
if protocol == model.Http {
httpConfig := config.CurrentCloudEvtConfig.Http
connectionClient, err = cloudevents.NewClientHTTP()
if connectionType == model.Pub {
ctx = cloudevents.ContextWithTarget(ctx, httpConfig.Url)
}
} else {
conn, err := connection.NewCloudEvtConnection(protocol, connectionType)
if err != nil {
return nil, err
}
connectionClient, err = cloudevents.NewClient(conn)
if err != nil {
return nil, err
}
}
if err != nil {
return nil, err
}
newClient := &CloudEvtClient{
context: context.Background(),
context: ctx,
protocol: protocol,
conn: conn,
connectionClient: connectionClient,
connectionType: connectionType,
......@@ -37,6 +63,10 @@ func NewClient(connectionType connection.ConnectionType) (*CloudEvtClient, error
}
func (c *CloudEvtClient) Close() error {
if c.protocol == model.Http {
return nil
}
if err := c.conn.Close(c.context); err != nil {
return err
}
......
......@@ -6,13 +6,9 @@ import (
"github.com/google/uuid"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/client"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/config"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/connection"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
)
func init() {
config.LoadConfig()
}
func CreateEvent(eventSource string, eventType string, data map[string]any) (event.Event, error) {
newEvent := cloudevents.NewEvent()
newEvent.SetID(uuid.New().String())
......@@ -25,6 +21,9 @@ func CreateEvent(eventSource string, eventType string, data map[string]any) (eve
return newEvent, nil
}
func CreateClient(connectionType connection.ConnectionType) (*client.CloudEvtClient, error) {
func CreateClient(connectionType model.ConnectionType) (*client.CloudEvtClient, error) {
if err := config.LoadConfig(); err != nil {
return nil, err
}
return client.NewClient(connectionType)
}
......@@ -2,31 +2,112 @@ package config
import (
"errors"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/connection"
"fmt"
"github.com/fatih/structs"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
"log"
"strings"
)
import "github.com/spf13/viper"
type CloudEvtConfiguration struct {
ProtocolType connection.ProtocolType `yaml:"protocolType"`
ConnectionUrl string `yaml:"connectionUrl"`
Subject string `yaml:"subject"`
type CloudEvtProvConfiguration struct {
Protocol model.ProtocolType `mapstructure:"protocol"`
Nats struct {
Url string `mapstructure:"url"`
Subject string `mapstructure:"subject"`
} `mapstructure:"nats"`
NatsJetstream struct {
Url string `mapstructure:"url"`
Subject string `mapstructure:"subject"`
StreamType string `mapstructure:"streamType"`
} `mapstructure:"natsJetstream"`
Kafka struct {
Url string `mapstructure:"url"`
Topic string `mapstructure:"topic"`
GroupId string `mapstructure:"groupId"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"kafka"`
Mqtt struct {
Url string `mapstructure:"url"`
Topic string `mapstructure:"topic"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"mqtt"`
Ampq struct {
Url string `mapstructure:"url"`
Topic string `mapstructure:"topic"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"amqp"`
Http struct {
Url string `mapstructure:"url"`
} `mapstructure:"http"`
}
var CurrentCloudEvtConfig CloudEvtConfiguration
var CurrentCloudEvtConfig CloudEvtProvConfiguration
func LoadConfig() {
setDefaults()
func LoadConfig() error {
readConfig()
if err := viper.Unmarshal(&CurrentCloudEvtConfig); err != nil {
panic(err)
return err
}
if err := checkConfig(); err != nil {
return err
}
return nil
}
func checkConfig() error {
// TODO: optional configs
if viper.IsSet("protocol") {
protocol := viper.GetString("protocol")
err := checkIfProtocolConfigIsSet(protocol)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("protocol is not set")
}
}
func checkIfProtocolConfigIsSet(protocol string) error {
//TODO: loop trough config -> no need for switch
var err error
switch protocol {
case string(model.Http):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Http)
case string(model.Nats):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Nats)
case string(model.NatsJetstream):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.NatsJetstream)
case string(model.Kafka):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Kafka)
case string(model.Mqtt):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Mqtt)
case string(model.Amqp):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Ampq)
default:
err = fmt.Errorf("protocol %s is not supported", protocol)
}
return err
}
func setDefaults() {
viper.SetDefault("connectionType", "nats")
viper.SetDefault("connectionUrl", "http://localhost:4222")
viper.SetDefault("subject", "events")
func checkIfAllConfigKeysAreSet(protocol string, config interface{}) error {
for _, configKey := range structs.Names(config) {
if !viper.IsSet(protocol + "." + configKey) {
return fmt.Errorf("config key %s for protocol %s is not set", configKey, protocol)
}
}
return nil
}
func readConfig() {
......@@ -42,5 +123,6 @@ func readConfig() {
}
viper.SetEnvPrefix("MESSAGING")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
}
......@@ -3,79 +3,76 @@ package connection
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"github.com/Shopify/sarama"
ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
mqttPaho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/eclipse/paho.golang/paho"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/config"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
"log"
"net"
"net/url"
"strings"
)
type CloudEvtConnection interface {
Close(ctx context.Context) error
}
type ProtocolType string
type ConnectionType string
const (
Kafka ProtocolType = "kafka"
Nats ProtocolType = "nats"
NatsJetstream ProtocolType = "natsJetstream"
Mqtt ProtocolType = "mqtt"
)
const (
Pub ConnectionType = "pub"
Sub ConnectionType = "sub"
)
func NewCloudEvtConnection(protocolType ProtocolType, connectionType ConnectionType) (CloudEvtConnection, error) {
func NewCloudEvtConnection(protocolType model.ProtocolType, connectionType model.ConnectionType) (CloudEvtConnection, error) {
var newConnection CloudEvtConnection
var err error
switch protocolType {
case Kafka:
case model.Kafka:
newConnection, err = newKafkaConnection(connectionType)
if err != nil {
return nil, err
}
case Nats:
case model.Nats:
newConnection, err = newNatsConnection(connectionType)
if err != nil {
return nil, err
}
case NatsJetstream:
case model.NatsJetstream:
newConnection, err = newNatsJetstreamConnection(connectionType)
if err != nil {
return nil, err
}
case Mqtt:
case model.Mqtt:
newConnection, err = newMqttConnection(connectionType)
if err != nil {
return nil, err
}
case model.Amqp:
newConnection, err = newAmqpConnection(connectionType)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown protocolType: %s. Could not create CloudEvtConnection", protocolType)
}
return newConnection, nil
}
func newKafkaConnection(connectionType ConnectionType) (CloudEvtConnection, error) {
func newKafkaConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
kafkaConfig := config.CurrentCloudEvtConfig.Kafka
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
switch connectionType {
case Pub:
sender, err := kafka_sarama.NewSender([]string{config.CurrentCloudEvtConfig.ConnectionUrl}, saramaConfig, "test-topic")
case model.Pub:
sender, err := kafka_sarama.NewSender([]string{kafkaConfig.Url}, saramaConfig, kafkaConfig.Topic)
if err != nil {
return nil, err
}
return sender, nil
case Sub:
receiver, err := kafka_sarama.NewConsumer([]string{config.CurrentCloudEvtConfig.ConnectionUrl}, saramaConfig, "test-group-id", "test-topic")
case model.Sub:
receiver, err := kafka_sarama.NewConsumer([]string{kafkaConfig.Url}, saramaConfig, kafkaConfig.GroupId, kafkaConfig.Topic)
if err != nil {
return nil, err
}
......@@ -85,16 +82,18 @@ func newKafkaConnection(connectionType ConnectionType) (CloudEvtConnection, erro
}
}
func newNatsConnection(connectionType ConnectionType) (CloudEvtConnection, error) {
func newNatsConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
natsConfig := config.CurrentCloudEvtConfig.Nats
switch connectionType {
case Pub:
sender, err := cenats.NewSender(config.CurrentCloudEvtConfig.ConnectionUrl, config.CurrentCloudEvtConfig.Subject, cenats.NatsOptions())
case model.Pub:
sender, err := cenats.NewSender(natsConfig.Url, natsConfig.Subject, cenats.NatsOptions())
if err != nil {
return nil, err
}
return sender, nil
case Sub:
consumer, err := cenats.NewConsumer(config.CurrentCloudEvtConfig.ConnectionUrl, config.CurrentCloudEvtConfig.Subject, cenats.NatsOptions())
case model.Sub:
consumer, err := cenats.NewConsumer(natsConfig.Url, natsConfig.Subject, cenats.NatsOptions())
if err != nil {
return nil, err
}
......@@ -104,37 +103,40 @@ func newNatsConnection(connectionType ConnectionType) (CloudEvtConnection, error
}
}
func newNatsJetstreamConnection(connectionType ConnectionType) (CloudEvtConnection, error) {
func newNatsJetstreamConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
natsJetstreamConfig := config.CurrentCloudEvtConfig.NatsJetstream
switch connectionType {
case Pub:
sender, err := cejsm.NewSender(config.CurrentCloudEvtConfig.ConnectionUrl, "ORDER", config.CurrentCloudEvtConfig.Subject, nil, nil)
case model.Pub:
sender, err := cejsm.NewSender(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, natsJetstreamConfig.Subject, nil, nil)
if err != nil {
return nil, err
}
return sender, nil
case Sub:
consumer, err := cejsm.NewConsumer(config.CurrentCloudEvtConfig.ConnectionUrl, "ORDER", config.CurrentCloudEvtConfig.Subject, cejsm.NatsOptions(), nil, nil)
case model.Sub:
consumer, err := cejsm.NewConsumer(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, natsJetstreamConfig.Subject, cejsm.NatsOptions(), nil, nil)
if err != nil {
return nil, err
}
return consumer, nil
default:
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsConnection", connectionType)
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsJetstreamConnection", connectionType)
}
}
func newMqttConnection(connectionType ConnectionType) (CloudEvtConnection, error) {
func newMqttConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
mqttConfig := config.CurrentCloudEvtConfig.Mqtt
ctx := context.Background()
conn, err := net.Dial("tcp", config.CurrentCloudEvtConfig.ConnectionUrl)
conn, err := net.Dial("tcp", mqttConfig.Url)
if err != nil {
return nil, err
}
switch connectionType {
case Pub:
case model.Pub:
connectionConfig := &paho.ClientConfig{
ClientID: "sender-client-id",
ClientID: mqttConfig.ClientId,
Conn: conn,
}
// optional connect option
......@@ -143,19 +145,19 @@ func newMqttConnection(connectionType ConnectionType) (CloudEvtConnection, error
CleanStart: true,
}
sender, err := mqttPaho.New(ctx, connectionConfig, mqttPaho.WithPublish(&paho.Publish{Topic: config.CurrentCloudEvtConfig.Subject}), mqttPaho.WithConnect(connOpt))
sender, err := mqttPaho.New(ctx, connectionConfig, mqttPaho.WithPublish(&paho.Publish{Topic: mqttConfig.Topic}), mqttPaho.WithConnect(connOpt))
if err != nil {
return nil, err
}
return sender, nil
case Sub:
case model.Sub:
connectionConfig := &paho.ClientConfig{
ClientID: "receiver-client-id",
ClientID: mqttConfig.ClientId,
Conn: conn,
}
subscribeOpt := &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
config.CurrentCloudEvtConfig.Subject: {QoS: 0},
mqttConfig.Topic: {QoS: 0},
},
}
......@@ -165,6 +167,34 @@ func newMqttConnection(connectionType ConnectionType) (CloudEvtConnection, error
}
return consumer, nil
default:
return nil, fmt.Errorf("unknown connectionType: %s. Could not create NatsConnection", connectionType)
return nil, fmt.Errorf("unknown connectionType: %s. Could not create MqttConnection", connectionType)
}
}
func newAmqpConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
amqpUrl, node, opts := parseAmqpConfig()
if connectionType == model.Sub || connectionType == model.Pub {
protocol, err := ceamqp.NewProtocol(amqpUrl, node, []amqp.ConnOption{}, []amqp.SessionOption{}, opts...)
if err != nil {
return nil, err
}
return protocol, nil
} else {
return nil, fmt.Errorf("unknown connectionType: %s. Could not create AmqpConnection", connectionType)
}
}
func parseAmqpConfig() (amqpUrl, node string, opts []ceamqp.Option) {
// TODO: authentication over URL is not safe!
amqpUrl = config.CurrentCloudEvtConfig.Ampq.Url
parsedUrl, err := url.Parse(amqpUrl)
if err != nil {
log.Fatal(err)
}
if parsedUrl.User != nil {
user := parsedUrl.User.Username()
pass, _ := parsedUrl.User.Password()
opts = append(opts, ceamqp.WithConnOpt(amqp.ConnSASLPlain(user, pass)))
}
return amqpUrl, strings.TrimPrefix(parsedUrl.Path, "/"), opts
}
......@@ -8,8 +8,10 @@ require (
)
require (
github.com/Azure/go-amqp v0.17.0 // indirect
github.com/IBM/sarama v1.42.0 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 // indirect
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.14.0 // indirect
......@@ -19,6 +21,7 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/eclipse/paho.golang v0.11.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
......
......@@ -36,6 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/IBM/sarama v1.42.0 h1:E5Kp9D5iIxI4b0Y0DYdiXil72v3kHIZMG8qTfWXVh2s=
......@@ -48,6 +50,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0 h1:RHQ4j4gtKyRekiMI2k58cBlf8oJjDYhStfurd0VI25w=
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.14.0/go.mod h1:6C9Iu3TSM1alrSktRBUwDJfOum1CyJe+k2QzlL75xM4=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0 h1:1MCVOxNZySIYOWMI1+6Z7YR0PK3AmDi/Fklk1KdFIv8=
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.14.0/go.mod h1:/B8nchIwQlr00jtE9bR0aoKaag7bO67xPM7r1DXCH4I=
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 h1:pXyRKZ0T5WoB6X9QnHS5cEyW0Got39bNQIECxGUKVO4=
......@@ -80,6 +84,9 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
......
package model
type ProtocolType string
type ConnectionType string
const (
Http ProtocolType = "http"
Kafka ProtocolType = "kafka"
Nats ProtocolType = "nats"
NatsJetstream ProtocolType = "natsJetstream"
Mqtt ProtocolType = "mqtt"
Amqp ProtocolType = "amqp"
)
const (
Pub ConnectionType = "pub"
Sub ConnectionType = "sub"
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment