Skip to content
Snippets Groups Projects
Commit 1d4bfc52 authored by Manuel Heß's avatar Manuel Heß
Browse files

refactor library structure and add optional config parameter

parent 3439b92c
No related branches found
No related tags found
No related merge requests found
package client
package cloudevtprov
import (
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/config"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/connection"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
)
type CloudEvtClient struct {
context context.Context
protocol model.ProtocolType
conn connection.CloudEvtConnection
protocol ProtocolType
conn cloudEvtConnection
connectionClient client.Client
connectionType model.ConnectionType
connectionType ConnectionType
alive bool
}
func NewClient(connectionType model.ConnectionType) (*CloudEvtClient, error) {
protocol := config.CurrentCloudEvtConfig.Protocol
func newClient(connectionType ConnectionType) (*CloudEvtClient, error) {
protocol := CurrentCloudEvtConfig.Protocol
var conn connection.CloudEvtConnection
var conn cloudEvtConnection
var connectionClient client.Client
ctx := context.Background()
var err error
if protocol == model.Http {
httpConfig := config.CurrentCloudEvtConfig.Http
if protocol == Http {
httpConfig := CurrentCloudEvtConfig.Http
connectionClient, err = cloudevents.NewClientHTTP()
if connectionType == model.Pub {
if connectionType == Pub {
ctx = cloudevents.ContextWithTarget(ctx, httpConfig.Url)
}
} else {
conn, err := connection.NewCloudEvtConnection(protocol, connectionType)
conn, err := newCloudEvtConnection(protocol, connectionType)
if err != nil {
return nil, err
}
......@@ -63,7 +60,7 @@ func NewClient(connectionType model.ConnectionType) (*CloudEvtClient, error) {
}
func (c *CloudEvtClient) Close() error {
if c.protocol == model.Http {
if c.protocol == Http {
return nil
}
......
......@@ -4,9 +4,6 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/client"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/config"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
)
func CreateEvent(eventSource string, eventType string, data map[string]any) (event.Event, error) {
......@@ -21,9 +18,9 @@ func CreateEvent(eventSource string, eventType string, data map[string]any) (eve
return newEvent, nil
}
func CreateClient(connectionType model.ConnectionType) (*client.CloudEvtClient, error) {
if err := config.LoadConfig(); err != nil {
return nil, err
func CreateClient(connectionType ConnectionType) (*CloudEvtClient, error) {
if err := loadConfig(); err != nil {
panic(err)
}
return client.NewClient(connectionType)
return newClient(connectionType)
}
package config
package cloudevtprov
import (
"errors"
"fmt"
"github.com/fatih/structs"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
"github.com/mitchellh/mapstructure"
"log"
"strings"
"time"
)
import "github.com/spf13/viper"
var cloudEvtProvViper *viper.Viper
func init() {
cloudEvtProvViper = viper.New()
}
type CloudEvtProvConfiguration struct {
Protocol model.ProtocolType `mapstructure:"protocol"`
Protocol ProtocolType `mapstructure:"protocol"`
Nats struct {
Url string `mapstructure:"url"`
Subject string `mapstructure:"subject"`
Url string `mapstructure:"url"`
Subject string `mapstructure:"subject"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"nats"`
NatsJetstream struct {
Url string `mapstructure:"url"`
Subject string `mapstructure:"subject"`
StreamType string `mapstructure:"streamType"`
Url string `mapstructure:"url"`
Subject string `mapstructure:"subject"`
QueueGroup string `mapstructure:"queueGroup,omitempty"`
StreamType string `mapstructure:"streamType"`
TimeoutInSec time.Duration `mapstructure:"timeoutInSec,omitempty"`
} `mapstructure:"natsJetstream"`
Kafka struct {
Url string `mapstructure:"url"`
Topic string `mapstructure:"topic"`
GroupId string `mapstructure:"groupId"`
GroupId string `mapstructure:"groupId,omitempty"`
ClientId string `mapstructure:"clientId"`
} `mapstructure:"kafka"`
......@@ -50,10 +60,10 @@ type CloudEvtProvConfiguration struct {
var CurrentCloudEvtConfig CloudEvtProvConfiguration
func LoadConfig() error {
func loadConfig() error {
readConfig()
if err := viper.Unmarshal(&CurrentCloudEvtConfig); err != nil {
if err := cloudEvtProvViper.Unmarshal(&CurrentCloudEvtConfig); err != nil {
return err
}
if err := checkConfig(); err != nil {
......@@ -63,10 +73,8 @@ func LoadConfig() error {
}
func checkConfig() error {
// TODO: optional configs
if viper.IsSet("protocol") {
protocol := viper.GetString("protocol")
if cloudEvtProvViper.IsSet("protocol") {
protocol := cloudEvtProvViper.GetString("protocol")
err := checkIfProtocolConfigIsSet(protocol)
if err != nil {
return err
......@@ -83,17 +91,18 @@ func checkIfProtocolConfigIsSet(protocol string) error {
var err error
switch protocol {
case string(model.Http):
case string(Http):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Http)
case string(model.Nats):
case string(Nats):
//TODO: check for subject name conventions https://docs.nats.io/nats-concepts/subjects
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Nats)
case string(model.NatsJetstream):
case string(NatsJetstream):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.NatsJetstream)
case string(model.Kafka):
case string(Kafka):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Kafka)
case string(model.Mqtt):
case string(Mqtt):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Mqtt)
case string(model.Amqp):
case string(Amqp):
err = checkIfAllConfigKeysAreSet(protocol, CurrentCloudEvtConfig.Ampq)
default:
err = fmt.Errorf("protocol %s is not supported", protocol)
......@@ -102,8 +111,13 @@ func checkIfProtocolConfigIsSet(protocol string) error {
}
func checkIfAllConfigKeysAreSet(protocol string, config interface{}) error {
for _, configKey := range structs.Names(config) {
if !viper.IsSet(protocol + "." + configKey) {
var configMap map[string]interface{}
if err := mapstructure.Decode(config, &configMap); err != nil {
return fmt.Errorf("could not decode config to map for checking config: %e", err)
}
for configKey := range configMap {
if !cloudEvtProvViper.IsSet(protocol + "." + configKey) {
return fmt.Errorf("config key %s for protocol %s is not set", configKey, protocol)
}
}
......@@ -111,18 +125,18 @@ func checkIfAllConfigKeysAreSet(protocol string, config interface{}) error {
}
func readConfig() {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
cloudEvtProvViper.SetConfigName("config")
cloudEvtProvViper.SetConfigType("yaml")
cloudEvtProvViper.AddConfigPath(".")
if err := viper.ReadInConfig(); err != nil {
if err := cloudEvtProvViper.ReadInConfig(); err != nil {
var configFileNotFoundError viper.ConfigFileNotFoundError
if errors.As(err, &configFileNotFoundError) {
log.Printf("Configuration not found but environment variables will be taken into account.")
}
}
viper.SetEnvPrefix("MESSAGING")
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
cloudEvtProvViper.SetEnvPrefix("CLOUDEVT")
cloudEvtProvViper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
cloudEvtProvViper.AutomaticEnv()
}
package connection
package cloudevtprov
import (
"context"
......@@ -11,67 +11,67 @@ import (
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/eclipse/paho.golang/paho"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/config"
"gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov/model"
"github.com/nats-io/nats.go"
"log"
"net"
"net/url"
"strings"
"time"
)
type CloudEvtConnection interface {
type cloudEvtConnection interface {
Close(ctx context.Context) error
}
func NewCloudEvtConnection(protocolType model.ProtocolType, connectionType model.ConnectionType) (CloudEvtConnection, error) {
var newConnection CloudEvtConnection
func newCloudEvtConnection(protocolType ProtocolType, connectionType ConnectionType) (cloudEvtConnection, error) {
var newConnection cloudEvtConnection
var err error
switch protocolType {
case model.Kafka:
case Kafka:
newConnection, err = newKafkaConnection(connectionType)
if err != nil {
return nil, err
}
case model.Nats:
case Nats:
newConnection, err = newNatsConnection(connectionType)
if err != nil {
return nil, err
}
case model.NatsJetstream:
case NatsJetstream:
newConnection, err = newNatsJetstreamConnection(connectionType)
if err != nil {
return nil, err
}
case model.Mqtt:
case Mqtt:
newConnection, err = newMqttConnection(connectionType)
if err != nil {
return nil, err
}
case model.Amqp:
case Amqp:
newConnection, err = newAmqpConnection(connectionType)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown protocolType: %s. Could not create CloudEvtConnection", protocolType)
return nil, fmt.Errorf("unknown protocolType: %s. Could not create cloudEvtConnection", protocolType)
}
return newConnection, nil
}
func newKafkaConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
kafkaConfig := config.CurrentCloudEvtConfig.Kafka
func newKafkaConnection(connectionType ConnectionType) (cloudEvtConnection, error) {
kafkaConfig := CurrentCloudEvtConfig.Kafka
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
switch connectionType {
case model.Pub:
case Pub:
sender, err := kafka_sarama.NewSender([]string{kafkaConfig.Url}, saramaConfig, kafkaConfig.Topic)
if err != nil {
return nil, err
}
return sender, nil
case model.Sub:
case Sub:
receiver, err := kafka_sarama.NewConsumer([]string{kafkaConfig.Url}, saramaConfig, kafkaConfig.GroupId, kafkaConfig.Topic)
if err != nil {
return nil, err
......@@ -82,18 +82,27 @@ func newKafkaConnection(connectionType model.ConnectionType) (CloudEvtConnection
}
}
func newNatsConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
natsConfig := config.CurrentCloudEvtConfig.Nats
func newNatsConnection(connectionType ConnectionType) (cloudEvtConnection, error) {
natsConfig := CurrentCloudEvtConfig.Nats
var natsOptions []nats.Option
if natsConfig.TimeoutInSec != 0*time.Second {
natsOptions = append(natsOptions, nats.Timeout(natsConfig.TimeoutInSec*time.Second))
}
switch connectionType {
case model.Pub:
sender, err := cenats.NewSender(natsConfig.Url, natsConfig.Subject, cenats.NatsOptions())
case Pub:
sender, err := cenats.NewSender(natsConfig.Url, natsConfig.Subject, natsOptions)
if err != nil {
return nil, err
}
return sender, nil
case model.Sub:
consumer, err := cenats.NewConsumer(natsConfig.Url, natsConfig.Subject, cenats.NatsOptions())
case Sub:
var consumerOption cenats.ConsumerOption
if natsConfig.QueueGroup != "" {
consumerOption = cenats.WithQueueSubscriber(natsConfig.QueueGroup)
}
consumer, err := cenats.NewConsumer(natsConfig.Url, natsConfig.Subject, natsOptions, consumerOption)
if err != nil {
return nil, err
}
......@@ -103,18 +112,27 @@ func newNatsConnection(connectionType model.ConnectionType) (CloudEvtConnection,
}
}
func newNatsJetstreamConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
natsJetstreamConfig := config.CurrentCloudEvtConfig.NatsJetstream
func newNatsJetstreamConnection(connectionType ConnectionType) (cloudEvtConnection, error) {
natsJetstreamConfig := CurrentCloudEvtConfig.NatsJetstream
var natsJetstreamOptions []nats.Option
if natsJetstreamConfig.TimeoutInSec != 0*time.Second {
natsJetstreamOptions = append(natsJetstreamOptions, nats.Timeout(natsJetstreamConfig.TimeoutInSec*time.Second))
}
switch connectionType {
case model.Pub:
sender, err := cejsm.NewSender(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, natsJetstreamConfig.Subject, nil, nil)
case Pub:
sender, err := cejsm.NewSender(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, natsJetstreamConfig.Subject, natsJetstreamOptions, nil)
if err != nil {
return nil, err
}
return sender, nil
case model.Sub:
consumer, err := cejsm.NewConsumer(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, natsJetstreamConfig.Subject, cejsm.NatsOptions(), nil, nil)
case Sub:
var consumerOption cejsm.ConsumerOption
if natsJetstreamConfig.QueueGroup != "" {
consumerOption = cejsm.WithQueueSubscriber(natsJetstreamConfig.QueueGroup)
}
consumer, err := cejsm.NewConsumer(natsJetstreamConfig.Url, natsJetstreamConfig.StreamType, natsJetstreamConfig.Subject, natsJetstreamOptions, nil, nil, consumerOption)
if err != nil {
return nil, err
}
......@@ -124,8 +142,8 @@ func newNatsJetstreamConnection(connectionType model.ConnectionType) (CloudEvtCo
}
}
func newMqttConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
mqttConfig := config.CurrentCloudEvtConfig.Mqtt
func newMqttConnection(connectionType ConnectionType) (cloudEvtConnection, error) {
mqttConfig := CurrentCloudEvtConfig.Mqtt
ctx := context.Background()
conn, err := net.Dial("tcp", mqttConfig.Url)
......@@ -134,7 +152,7 @@ func newMqttConnection(connectionType model.ConnectionType) (CloudEvtConnection,
}
switch connectionType {
case model.Pub:
case Pub:
connectionConfig := &paho.ClientConfig{
ClientID: mqttConfig.ClientId,
Conn: conn,
......@@ -150,7 +168,7 @@ func newMqttConnection(connectionType model.ConnectionType) (CloudEvtConnection,
return nil, err
}
return sender, nil
case model.Sub:
case Sub:
connectionConfig := &paho.ClientConfig{
ClientID: mqttConfig.ClientId,
Conn: conn,
......@@ -171,9 +189,9 @@ func newMqttConnection(connectionType model.ConnectionType) (CloudEvtConnection,
}
}
func newAmqpConnection(connectionType model.ConnectionType) (CloudEvtConnection, error) {
func newAmqpConnection(connectionType ConnectionType) (cloudEvtConnection, error) {
amqpUrl, node, opts := parseAmqpConfig()
if connectionType == model.Sub || connectionType == model.Pub {
if connectionType == Sub || connectionType == Pub {
protocol, err := ceamqp.NewProtocol(amqpUrl, node, []amqp.ConnOption{}, []amqp.SessionOption{}, opts...)
if err != nil {
return nil, err
......@@ -186,7 +204,7 @@ func newAmqpConnection(connectionType model.ConnectionType) (CloudEvtConnection,
func parseAmqpConfig() (amqpUrl, node string, opts []ceamqp.Option) {
// TODO: authentication over URL is not safe!
amqpUrl = config.CurrentCloudEvtConfig.Ampq.Url
amqpUrl = CurrentCloudEvtConfig.Ampq.Url
parsedUrl, err := url.Parse(amqpUrl)
if err != nil {
log.Fatal(err)
......
package model
package cloudevtprov
type ProtocolType string
type ConnectionType string
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment