Skip to content
Snippets Groups Projects
Commit 354093c3 authored by Steffen Schulze's avatar Steffen Schulze
Browse files

package renamed

parent c355ae57
No related branches found
No related tags found
No related merge requests found
package cloudevtprov
package cloudeventprovider
import (
"context"
"fmt"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"time"
)
type CloudEvtClient struct {
type CloudEventProviderClient struct {
context context.Context
protocol ProtocolType
conn cloudEvtConnection
conn cloudEventConnection
connectionClient client.Client
connectionType ConnectionType
alive bool
}
// newClient ignores topic parameter for http connections
func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, error) {
protocol := currentCloudEvtConfig.Protocol
func newClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
protocol := currentCloudEventConfig.Protocol
ctx := context.Background()
var connection interface{}
var err error
if protocol == Http {
httpConfig := currentCloudEvtConfig.Http
httpConfig := currentCloudEventConfig.Http
switch connectionType {
case Pub, Req:
......@@ -40,7 +41,7 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, er
}
} else {
connection, err = newCloudEvtConnection(protocol, connectionType, topic)
connection, err = newCloudEventConnection(protocol, connectionType, topic)
}
if err != nil {
......@@ -52,13 +53,13 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, er
return nil, err
}
// if http cloudEvtConnection = nil
cloudEvtConnection, _ := connection.(cloudEvtConnection)
// if http cloudEventConnection = nil
cloudEventConnection, _ := connection.(cloudEventConnection)
newClient := &CloudEvtClient{
newClient := &CloudEventProviderClient{
context: ctx,
protocol: protocol,
conn: cloudEvtConnection,
conn: cloudEventConnection,
connectionClient: connectionClient,
connectionType: connectionType,
alive: true,
......@@ -67,7 +68,7 @@ func newClient(connectionType ConnectionType, topic string) (*CloudEvtClient, er
return newClient, nil
}
func (c *CloudEvtClient) Close() error {
func (c *CloudEventProviderClient) Close() error {
//TODO: what about closing http?
if c.protocol == Http {
return nil
......@@ -81,11 +82,11 @@ func (c *CloudEvtClient) Close() error {
return nil
}
func (c *CloudEvtClient) Alive() bool {
func (c *CloudEventProviderClient) Alive() bool {
return c.alive
}
func (c *CloudEvtClient) Pub(event event.Event) error {
func (c *CloudEventProviderClient) Pub(event event.Event) error {
if c.connectionType == Pub {
result := c.connectionClient.Send(c.context, event)
if err := getResultError(result); err != nil {
......@@ -98,7 +99,7 @@ func (c *CloudEvtClient) Pub(event event.Event) error {
}
// Sub method is blocking. Use it in a goroutine.
func (c *CloudEvtClient) Sub(fn func(event event.Event)) error {
func (c *CloudEventProviderClient) Sub(fn func(event event.Event)) error {
if c.connectionType == Sub {
if err := c.connectionClient.StartReceiver(c.context, fn); err != nil {
return err
......@@ -109,7 +110,7 @@ func (c *CloudEvtClient) Sub(fn func(event event.Event)) error {
}
}
func (c *CloudEvtClient) Request(event event.Event, timeOut time.Duration) (*event.Event, error) {
func (c *CloudEventProviderClient) Request(event event.Event, timeOut time.Duration) (*event.Event, error) {
if c.connectionType == Req {
ctx := context.WithValue(c.context, "timeOut", timeOut)
response, result := c.connectionClient.Request(ctx, event)
......@@ -123,7 +124,7 @@ func (c *CloudEvtClient) Request(event event.Event, timeOut time.Duration) (*eve
}
// Reply method is blocking. Use it in a goroutine.
func (c *CloudEvtClient) Reply(responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error {
func (c *CloudEventProviderClient) Reply(responseFunc func(ctx context.Context, event event.Event) (*event.Event, error)) error {
if c.connectionType == Rep {
switch c.protocol {
case Nats:
......
package cloudevtprov
package cloudeventprovider
import (
cloudevents "github.com/cloudevents/sdk-go/v2"
......@@ -18,7 +18,7 @@ func NewEvent(eventSource string, eventType string, data map[string]any) (event.
return newEvent, nil
}
func NewClient(connectionType ConnectionType, topic string) (*CloudEvtClient, error) {
func NewClient(connectionType ConnectionType, topic string) (*CloudEventProviderClient, error) {
if err := loadConfig(); err != nil {
panic(err)
}
......
package cloudevtprov
package cloudeventprovider
import (
"errors"
"fmt"
"github.com/mitchellh/mapstructure"
"log"
"strings"
"time"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
)
import "github.com/spf13/viper"
var cloudEvtProvViper *viper.Viper
......@@ -55,12 +56,12 @@ type cloudEvtProvConfiguration struct {
} `mapstructure:"http"`
}
var currentCloudEvtConfig cloudEvtProvConfiguration
var currentCloudEventConfig cloudEvtProvConfiguration
func loadConfig() error {
readConfig()
if err := cloudEvtProvViper.Unmarshal(&currentCloudEvtConfig); err != nil {
if err := cloudEvtProvViper.Unmarshal(&currentCloudEventConfig); err != nil {
return err
}
if err := checkConfig(); err != nil {
......@@ -89,18 +90,18 @@ func checkIfProtocolConfigIsSet(protocol string) error {
switch protocol {
case string(Http):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Http)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Http)
case string(Nats):
//TODO: check for subject name conventions https://docs.nats.io/nats-concepts/subjects
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Nats)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Nats)
case string(NatsJetstream):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.NatsJetstream)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.NatsJetstream)
case string(Kafka):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Kafka)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Kafka)
case string(Mqtt):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Mqtt)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Mqtt)
case string(Amqp):
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEvtConfig.Ampq)
err = checkIfAllConfigKeysAreSet(protocol, currentCloudEventConfig.Ampq)
default:
err = fmt.Errorf("protocol %s is not supported", protocol)
}
......
package cloudevtprov
package cloudeventprovider
import (
"errors"
......@@ -7,10 +7,10 @@ import (
func TestOptionalConfig(t *testing.T) {
// optional config timeoutInSec is missing
t.Setenv("CLOUDEVTPROV_PROTOCOL", "nats")
t.Setenv("CLOUDEVTPROV_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVTPROV_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVTPROV_NATS_QUEUEGROUP", "test")
t.Setenv("CLOUDEVENTPROVIDER_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_NATS_SUBJECT", "events")
t.Setenv("CLOUDEVENTPROVIDER_NATS_QUEUEGROUP", "test")
err := loadConfig()
if err != nil {
......@@ -20,8 +20,8 @@ func TestOptionalConfig(t *testing.T) {
func TestMissingConfig(t *testing.T) {
// subject config is missing
t.Setenv("CLOUDEVTPROV_PROTOCOL", "nats")
t.Setenv("CLOUDEVTPROV_NATS_URL", "http://localhost:4222")
t.Setenv("CLOUDEVENTPROVIDER_PROTOCOL", "nats")
t.Setenv("CLOUDEVENTPROVIDER_NATS_URL", "http://localhost:4222")
err := loadConfig()
if !errors.Is(errors.Unwrap(err), ErrConfigKeyMissing) {
......
package cloudevtprov
package cloudeventprovider
import (
"context"
"fmt"
"log"
"net"
"net/url"
"strings"
"time"
"github.com/Azure/go-amqp"
"github.com/Shopify/sarama"
ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2"
......@@ -12,19 +18,14 @@ import (
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/eclipse/paho.golang/paho"
"github.com/nats-io/nats.go"
"log"
"net"
"net/url"
"strings"
"time"
)
type cloudEvtConnection interface {
type cloudEventConnection interface {
Close(ctx context.Context) error
}
func newCloudEvtConnection(protocolType ProtocolType, connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
var newConnection cloudEvtConnection
func newCloudEventConnection(protocolType ProtocolType, connectionType ConnectionType, topic string) (cloudEventConnection, error) {
var newConnection cloudEventConnection
var err error
switch protocolType {
......@@ -59,8 +60,8 @@ func newCloudEvtConnection(protocolType ProtocolType, connectionType ConnectionT
return newConnection, nil
}
func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
kafkaConfig := currentCloudEvtConfig.Kafka
func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
kafkaConfig := currentCloudEventConfig.Kafka
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
......@@ -82,8 +83,8 @@ func newKafkaConnection(connectionType ConnectionType, topic string) (cloudEvtCo
}
}
func newNatsConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
natsConfig := currentCloudEvtConfig.Nats
func newNatsConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsConfig := currentCloudEventConfig.Nats
var natsOptions []nats.Option
if natsConfig.TimeoutInSec != 0*time.Second {
natsOptions = append(natsOptions, nats.Timeout(natsConfig.TimeoutInSec*time.Second))
......@@ -124,8 +125,8 @@ func newNatsConnection(connectionType ConnectionType, topic string) (cloudEvtCon
}
}
func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
natsJetstreamConfig := currentCloudEvtConfig.NatsJetstream
func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
natsJetstreamConfig := currentCloudEventConfig.NatsJetstream
var natsJetstreamOptions []nats.Option
if natsJetstreamConfig.TimeoutInSec != 0*time.Second {
natsJetstreamOptions = append(natsJetstreamOptions, nats.Timeout(natsJetstreamConfig.TimeoutInSec*time.Second))
......@@ -154,8 +155,8 @@ func newNatsJetstreamConnection(connectionType ConnectionType, topic string) (cl
}
}
func newMqttConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
mqttConfig := currentCloudEvtConfig.Mqtt
func newMqttConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
mqttConfig := currentCloudEventConfig.Mqtt
ctx := context.Background()
conn, err := net.Dial("tcp", mqttConfig.Url)
......@@ -201,7 +202,7 @@ func newMqttConnection(connectionType ConnectionType, topic string) (cloudEvtCon
}
}
func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEvtConnection, error) {
func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEventConnection, error) {
amqpUrl, node, opts := parseAmqpConfig()
if connectionType == Sub || connectionType == Pub {
var protocol *ceamqp.Protocol
......@@ -222,7 +223,7 @@ func newAmqpConnection(connectionType ConnectionType, topic string) (cloudEvtCon
func parseAmqpConfig() (amqpUrl, node string, opts []ceamqp.Option) {
// TODO: authentication over URL is not safe!
amqpUrl = currentCloudEvtConfig.Ampq.Url
amqpUrl = currentCloudEventConfig.Ampq.Url
parsedUrl, err := url.Parse(amqpUrl)
if err != nil {
log.Fatal(err)
......
package cloudevtprov
package cloudeventprovider
import "errors"
......
package cloudevtprov
package cloudeventprovider
type ProtocolType string
type ConnectionType string
......
package cloudevtprov
package cloudeventprovider
import (
"bytes"
"context"
"io"
"log"
"sync"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/nats-io/nats.go"
"io"
"log"
"sync"
)
type natsReplyConsumerInterface interface {
......
package cloudevtprov
package cloudeventprovider
import (
"bytes"
"context"
"log"
"time"
cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/nats-io/nats.go"
"log"
"time"
)
type natsRequester struct {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment