Skip to content
Snippets Groups Projects
Commit 595a846f authored by Steffen Schulze's avatar Steffen Schulze
Browse files

Merge branch 'rename' into 'main'

Rename

See merge request eclipse/xfsc/libraries/messaging/cloudeventprovider!3
parents c355ae57 6a6aada8
No related branches found
Tags v0.0.7
No related merge requests found
......@@ -4,12 +4,12 @@
Add the module as dependency using go mod:
`go get gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov`
`go get gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudeventprovider`
And import the module in your code:
```go
import "gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudevtprov"
import "gitlab.eclipse.org/eclipse/xfsc/libraries/messaging/cloudeventprovider"
```
## Configure protocol and corresponding config with yaml File
......@@ -25,14 +25,14 @@ nats:
```go
func main() {
topic := "events"
c, err := cloudevtprov.CreateClient(cloudevtprov.Pub, topic)
c, err := cloudeventprovider.CreateClient(cloudeventprovider.Pub, topic)
if err != nil {
log.Fatal(err)
}
defer c.Close()
data := map[string]string{"hello": "world"}
event, err := cloudevtprov.CreateEvent("example/uri", "example.type", data)
event, err := cloudeventprovider.CreateEvent("example/uri", "example.type", data)
if err != nil {
log.Fatal(err)
}
......@@ -51,7 +51,7 @@ func receive(event cloudevents.Event) {
func main() {
topic := "events"
client, err := cloudevtprov.CreateClient(cloudevtprov.Sub, topic)
client, err := cloudeventprovider.CreateClient(cloudeventprovider.Sub, topic)
if err != nil {
log.Fatal(err)
}
......
package cloudevtprov
package cloudeventprovider
import (
"context"
"fmt"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"time"
)
type CloudEvtClient struct {
type CloudEventProviderClient struct {
context context.Context
protocol ProtocolType
conn cloudEvtConnection
conn cloudEventConnection
connectionClient client.Client
connectionType ConnectionType
alive bool
}
// newClient ignores topic parameter for http connections
func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, error) {
protocol := currentCloudEvtConfig.Protocol
func newClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
protocol := currentCloudEventConfig.Protocol
ctx := context.Background()
var connection interface{}
var err error
if protocol == Http {
httpConfig := currentCloudEvtConfig.Http
httpConfig := currentCloudEventConfig.Http
switch connectionType {
case Pub, Req:
......@@ -40,7 +41,7 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, er
}
} else {
connection, err = newCloudEvtConnection(protocol, connectionType, topic)
connection, err = newCloudEventConnection(protocol, connectionType, topic)
}
if err != nil {
......@@ -52,13 +53,13 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, er
return nil, err
}
// if http cloudEvtConnection = nil
cloudEvtConnection, _ := connection.(cloudEvtConnection)
// if http cloudEventConnection = nil
cloudEventConnection, _ := connection.(cloudEventConnection)
newClient := &CloudEvtClient{
newClient := &CloudEventProviderClient{
context: ctx,
protocol: protocol,
conn: cloudEvtConnection,
conn: cloudEventConnection,
connectionClient: connectionClient,
connectionType: connectionType,
alive: true,
......@@ -67,7 +68,7 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, er
return newClient, nil
}
func (c *CloudEvtClient) Close() error {
func (c *CloudEventProviderClient) Close() error {
//TODO: what about closing http?
if c.protocol == Http {
return nil
......@@ -81,11 +82,11 @@ func (c *CloudEvtClient) Close() error {
return nil
}
func (c *CloudEvtClient) Alive() bool {
func (c *CloudEventProviderClient) Alive() bool {
return c.alive
}
func (c *CloudEvtClient) Pub(event event.Event) error {
func (c *CloudEventProviderClient) Pub(event event.Event) error {
if c.connectionType == Pub {
result := c.connectionClient.Send(c.context, event)
if err := getResultError(result); err != nil {
......@@ -98,7 +99,7 @@ func (c *CloudEvtClient) Pub(event event.Event) error {
}
// Sub method is blocking. Use it in a goroutine.
func (c *CloudEvtClient) Sub(fn func(event event.Event)) error {
func (c *CloudEventProviderClient) Sub(fn func(event event.Event)) error {
if c.connectionType == Sub {
if err := c.connectionClient.StartReceiver(c.context, fn); err != nil {
return err
......@@ -109,7 +110,7 @@ func (c *CloudEvtClient) Sub(fn func(event event.Event)) error {
}
}
func (c *CloudEvtClient) Request(event event.Event, timeOut time.Duration) (*event.Event, error) {
func (c *CloudEventProviderClient) Request(event event.Event, timeOut time.Duration) (*event.Event, error) {
if c.connectionType == Req {
ctx := context.WithValue(c.context, "timeOut", timeOut)
response, result := c.connectionClient.Request(ctx, event)
......@@ -123,7 +124,7 @@ func (c *CloudEvtClient) Request(event event.Event, timeOut time.Duration) (*eve
}
// Reply method is blocking. Use it in a goroutine.
func (c *CloudEvtClient) Reply(responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error {
func (c *CloudEventProviderClient) Reply(responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error {
if c.connectionType == Rep {
switch c.protocol {
case Nats:
......
package cloudevtprov
package cloudeventprovider
import (
cloudevents "github.com/cloudevents/sdk-go/v2"
......@@ -18,7 +18,7 @@ func NewEvent(eventSource string, eventType string, data map[string]any) (event.
return newEvent, nil
}
func NewClient(connectionType ConnectionType, topic string) (*CloudEvtClient, error) {
func NewClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
if err := loadConfig(); err != nil {
panic(err)
}
......
package cloudevtprov
package cloudeventprovider
import (
"errors"
"fmt"
"github.com/mitchellh/mapstructure"
"log"
"strings"
"time"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
)
import "github.com/spf13/viper"
var cloudEvtProvViper *viper.Viper
var cloudEventProviderViper *viper.Viper
func init() {
cloudEvtProvViper = viper.New()
cloudEventProviderViper = viper.New()
}
type cloudEvtProvConfiguration struct {
type cloudEventProviderConfiguration struct {
Protocol ProtocolType `mapstructure:"protocol"`
Nats struct {
......@@ -55,12 +56,12 @@ type cloudEvtProvConfiguration struct {
} `mapstructure:"http"`
}
var currentCloudEvtConfig cloudEvtProvConfiguration
var currentCloudEventConfig cloudEventProviderConfiguration
func loadConfig() error {
readConfig()
if err := cloudEvtProvViper.Unmarshal(&currentCloudEvtConfig); err != nil {
if err := cloudEventProviderViper.Unmarshal(&currentCloudEventConfig); err != nil {
return err
}
if err := checkConfig(); err != nil {
......@@ -70,8 +71,8 @@ func loadConfig() error {
}
func checkConfig() error {
if cloudEvtProvViper.IsSet("protocol") {
protocol := cloudEvtProvViper.GetString("protocol")
if cloudEventProviderViper.IsSet("protocol") {
protocol := cloudEventProviderViper.GetString("protocol")
err := checkIfProtocolConfigIsSet(protocol)
if err != nil {
return err
......@@ -89,18 +90,18 @@ func checkIfProtocolConfigIsSet(protocol string) error {
switch protocol {
case string(Http):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Http)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Http)
case string(Nats):
//TODO: check for subject name conventions https://docs.nats.io/nats-concepts/subjects
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Nats)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Nats)
case string(NatsJetstream):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.NatsJetstream)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.NatsJetstream)
case string(Kafka):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Kafka)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Kafka)
case string(Mqtt):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Mqtt)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Mqtt)
case string(Amqp):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Ampq)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Ampq)
default:
err = fmt.Errorf("protocol %s is not supported", protocol)
}
......@@ -114,7 +115,7 @@ func checkIfAllConfigKeysAreSet(protocol string, config interface{}) error {
}
for configKey := range configMap {
if !cloudEvtProvViper.IsSet(protocol + "." + configKey) {
if !cloudEventProviderViper.IsSet(protocol + "." + configKey) {
return fmt.Errorf("%w: missing configKey %s for protocol %s", ErrConfigKeyMissing, configKey, protocol)
}
}
......@@ -122,18 +123,18 @@ func checkIfAllConfigKeysAreSet(protocol string, config interface{}) error {
}
func readConfig() {
cloudEvtProvViper.SetConfigName("config")
cloudEvtProvViper.SetConfigType("yaml")
cloudEvtProvViper.AddConfigPath(".")
cloudEventProviderViper.SetConfigName("config")
cloudEventProviderViper.SetConfigType("yaml")
cloudEventProviderViper.AddConfigPath(".")
cloudEvtProvViper.SetEnvPrefix("CLOUDEVTPROV")
cloudEvtProvViper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
cloudEventProviderViper.SetEnvPrefix("CLOUDEVENTPROVIDER")
cloudEventProviderViper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
if err := cloudEvtProvViper.ReadInConfig(); err != nil {
if err := cloudEventProviderViper.ReadInConfig(); err != nil {
var configFileNotFoundError viper.ConfigFileNotFoundError
if errors.As(err, &configFileNotFoundError) {
log.Printf("Configuration not found but environment variables will be taken into account.")
cloudEvtProvViper.AutomaticEnv()
cloudEventProviderViper.AutomaticEnv()
}
}
}
package cloudevtprov
package cloudeventprovider
import (
"errors"
......@@ -7,10 +7,10 @@ import (
func TestOptionalConfig(t *testing.T) {
// optional config timeoutInSec is missing
t.Setenv("CLOUDEVTPROV_PROTOCOL", "nats")
t.Setenv("CLOUDEVTPROV_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVTPROV_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVTPROV_NATS_QUEUEGROUP", "test")
t.Setenv("CLOUDEVENTPROVIDER_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVENTPROVIDER_NATS_QUEUEGROUP", "test")
err := loadConfig()
if err != nil {
......@@ -20,8 +20,8 @@ func TestOptionalConfig(t *testing.T) {
func TestMissingConfig(t *testing.T) {
// subject config is missing
t.Setenv("CLOUDEVTPROV_PROTOCOL", "nats")
t.Setenv("CLOUDEVTPROV_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_NATS_URL", "http://localhost:4222")
err := loadConfig()
if !errors.Is(errors.Unwrap(err), ErrConfigKeyMissing) {
......
package cloudevtprov
package cloudeventprovider
import (
"context"
"fmt"
"log"
"net"
"net/url"
"strings"
"time"
"github.com/Azure/go-amqp"
"github.com/Shopify/sarama"
ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2"
......@@ -12,19 +18,14 @@ import (
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/eclipse/paho.golang/paho"
"github.com/nats-io/nats.go"
"log"
"net"
"net/url"
"strings"
"time"
)
type cloudEvtConnection interface {
type cloudEventConnection interface {
Close(ctx context.Context) error
}
func newCloudEvtConnection(protocolType ProtocolType, connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
var newConnection cloudEvtConnection
func newCloudEventConnection(protocolType ProtocolType, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
var newConnection cloudEventConnection
var err error
switch protocolType {
......@@ -54,13 +55,13 @@ func newCloudEvtConnection(protocolType ProtocolType, connectionType ConnectionT
return nil, err
}
default:
return nil, fmt.Errorf("unknown protocolType: %s. Could not create cloudEvtConnection", protocolType)
return nil, fmt.Errorf("unknown protocolType: %s. Could not create cloudEventConnection", protocolType)
}
return newConnection, nil
}
func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
kafkaConfig := currentCloudEvtConfig.Kafka
func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
kafkaConfig := currentCloudEventConfig.Kafka
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
......@@ -82,8 +83,8 @@ func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEvtCo
}
}
func newNatsConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
natsConfig := currentCloudEvtConfig.Nats
func newNatsConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsConfig := currentCloudEventConfig.Nats
var natsOptions []nats.Option
if natsConfig.TimeoutInSec != 0*time.Second {
natsOptions = append(natsOptions, nats.Timeout(natsConfig.TimeoutInSec*time.Second))
......@@ -124,8 +125,8 @@ func newNatsConnection(connectionType ConnectionType, topic string) (cloudEvtCon
}
}
func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
natsJetstreamConfig := currentCloudEvtConfig.NatsJetstream
func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsJetstreamConfig := currentCloudEventConfig.NatsJetstream
var natsJetstreamOptions []nats.Option
if natsJetstreamConfig.TimeoutInSec != 0*time.Second {
natsJetstreamOptions = append(natsJetstreamOptions, nats.Timeout(natsJetstreamConfig.TimeoutInSec*time.Second))
......@@ -154,8 +155,8 @@ func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cl
}
}
func newMqttConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
mqttConfig := currentCloudEvtConfig.Mqtt
func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
mqttConfig := currentCloudEventConfig.Mqtt
ctx := context.Background()
conn, err := net.Dial("tcp", mqttConfig.Url)
......@@ -201,7 +202,7 @@ func newMqttConnection(connectionType ConnectionType, topic string) (cloudEvtCon
}
}
func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
amqpUrl, node, opts := parseAmqpConfig()
if connectionType == Sub || connectionType == Pub {
var protocol *ceamqp.Protocol
......@@ -222,7 +223,7 @@ func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEvtCon
func parseAmqpConfig() (amqpUrl, node string, opts []ceamqp.Option) {
// TODO: authentication over URL is not safe!
amqpUrl = currentCloudEvtConfig.Ampq.Url
amqpUrl = currentCloudEventConfig.Ampq.Url
parsedUrl, err := url.Parse(amqpUrl)
if err != nil {
log.Fatal(err)
......
package cloudevtprov
package cloudeventprovider
import "errors"
......
package cloudevtprov
package cloudeventprovider
type ProtocolType string
type ConnectionType string
......
package cloudevtprov
package cloudeventprovider
import (
"bytes"
"context"
"io"
"log"
"sync"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/nats-io/nats.go"
"io"
"log"
"sync"
)
type natsReplyConsumerInterface interface {
......
package cloudevtprov
package cloudeventprovider
import (
"bytes"
"context"
"log"
"time"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/nats-io/nats.go"
"log"
"time"
)
type natsRequester struct {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment