Skip to content
Snippets Groups Projects
Commit 9b32807b authored by Steffen Schulze's avatar Steffen Schulze
Browse files

routing finalized for inbound/outbound

parent e02e83c0
No related branches found
No related tags found
1 merge request!10Refactoring
Pipeline #40459 failed with stages
in 3 minutes and 37 seconds
......@@ -93,7 +93,7 @@ func (app *application) InvitationMessage(context *gin.Context) {
return
}
mediateeBase.RemoteDid = uuid.NewString()
mediateeBase.RemoteDid = uuid.NewString() //temporary
isBlocked, err := app.mediator.Database.IsBlocked(mediateeBase.RemoteDid)
if err != nil {
......@@ -107,7 +107,7 @@ func (app *application) InvitationMessage(context *gin.Context) {
return
}
err = app.mediator.ConnectionManager.StoreConnection(mediateeBase.Protocol, mediateeBase.RemoteDid, mediateeBase.Topic, mediateeBase.Properties, mediateeBase.EventType)
err = app.mediator.ConnectionManager.StoreConnection(mediateeBase.Protocol, mediateeBase.RemoteDid, mediateeBase.Topic, mediateeBase.Properties, mediateeBase.EventType, []string{})
if err != nil {
if err == connectionmanager.ERROR_PROTOCOL_NOT_SUPPORTED {
_ = app.SendPr(context, protocol.PR_PROTOCOL_NOT_SUPPORTED, err)
......
......@@ -21,7 +21,7 @@ var ERROR_PROTOCOL_NOT_SUPPORTED = errors.New("protocol not supported")
var ERROR_INTERNAL = errors.New("internal error")
var ERROR_CONNECTION_ALREADY_EXISTS = errors.New("connection already exists")
func (c *ConnectionManager) StoreConnection(protocol string, remoteDid string, topic string, properties map[string]string, eventType string) (err error) {
func (c *ConnectionManager) StoreConnection(protocol string, remoteDid string, topic string, properties map[string]string, eventType string, recipients []string) (err error) {
switch config.CurrentConfiguration.CloudForwarding.Protocol {
case config.HTTP:
if protocol != config.HTTP {
......@@ -45,7 +45,7 @@ func (c *ConnectionManager) StoreConnection(protocol string, remoteDid string, t
if isMediated {
return ERROR_CONNECTION_ALREADY_EXISTS
}
err = c.database.AddMediatee(database.Mediatee{RemoteDid: remoteDid, Protocol: protocol, Topic: topic, EventType: eventType, Properties: properties})
err = c.database.AddMediatee(database.Mediatee{RemoteDid: remoteDid, Protocol: protocol, Topic: topic, EventType: eventType, Properties: properties, RecipientDids: recipients})
if err != nil {
return ERROR_INTERNAL
}
......
package messaging
import "encoding/json"
type ConnectorMessage struct {
RequestId string `json:"accountId"`
Did string `json:"did"`
Payload json.RawMessage `json:"payload"`
}
type InvitationNotify struct {
RequestId string `json:"requestId"`
Did string `json:"did"`
InvitationId string `json:"invitationId"`
Did string `json:"did"`
}
......@@ -11,6 +11,7 @@ import (
"text/template"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
"gitlab.eclipse.org/eclipse/xfsc/common-services/didcomm-connector/didcomm"
"gitlab.eclipse.org/eclipse/xfsc/common-services/didcomm-connector/internal/config"
"gitlab.eclipse.org/eclipse/xfsc/common-services/didcomm-connector/mediator"
......@@ -44,7 +45,7 @@ func ReceiveMessage(mediator *mediator.Mediator) {
}
defer client.Close()
routing := NewRouting(mediator)
// Use a WaitGroup to wait for a message to arrive
wg := sync.WaitGroup{}
wg.Add(1)
......@@ -80,10 +81,24 @@ func ReceiveMessage(mediator *mediator.Mediator) {
},
}
err = mediator.Database.AddMessage(content.RequestId, attachment)
var body = make(map[string]interface{})
body["next"] = content.Did
bodyJson, err := json.Marshal(body)
if err != nil {
config.Logger.Error("could not add message to inbox", "err", err)
panic(err)
}
message := didcomm.Message{
Id: uuid.NewString(),
Type: PIURI_ROUTING_FORWARD,
To: &[]string{mediator.Did},
Attachments: &[]didcomm.Attachment{attachment},
Body: string(bodyJson),
}
routing.handleForward(message, false)
})
if err != nil {
config.Logger.Error("Error in subscription of cloud event", "msg", err)
......@@ -109,6 +124,9 @@ func sendCloudEvent(message any, mediatee *database.Mediatee) (err error) {
config.Logger.Info(fmt.Sprintf("message to send as cloud event: %s", message))
mediatee.Properties["routingKey"] = mediatee.RoutingKey
mediatee.Properties["remoteDid"] = mediatee.RemoteDid
jsonData, err := json.Marshal(mediatee.Properties)
if err != nil {
......
......@@ -320,32 +320,6 @@ func (h *CoordinateMediation) handleMediationRequest(message didcomm.Message, be
return PR_INVALID_REQUEST, err
}
err = h.mediator.ConnectionManager.StoreConnection(invitation.Protocol, *message.From, invitation.Topic, invitation.Properties, invitation.EventType)
if err != nil {
config.Logger.Error("error finalizing mediatee", err)
return PR_INTERNAL_SERVER_ERROR, err
}
h.mediator.Database.DeleteMediatee(id)
key, err := db.GetRoutingKey(*message.From)
if err != nil {
config.Logger.Error("Can not get routing key from db", err)
return PR_INTERNAL_SERVER_ERROR, err
}
if key != "" {
response = didcomm.Message{
Id: uuid.Must(uuid.NewRandom()).String(),
Type: "https://didcomm.org/coordinate-mediation/3.0/mediate-deny",
Body: "{}",
From: &h.mediator.Did,
}
return response, nil
}
service, err := h.mediator.CreateMediatorService()
if err != nil {
config.Logger.Error("Mediator service creation failed", err)
......@@ -386,6 +360,32 @@ func (h *CoordinateMediation) handleMediationRequest(message didcomm.Message, be
From: &h.mediator.Did,
}
err = h.mediator.ConnectionManager.StoreConnection(invitation.Protocol, *message.From, invitation.Topic, invitation.Properties, invitation.EventType, []string{routingKey})
if err != nil {
config.Logger.Error("error finalizing mediatee", err)
return PR_INTERNAL_SERVER_ERROR, err
}
h.mediator.Database.DeleteMediatee(id)
key, err := db.GetRoutingKey(*message.From)
if err != nil {
config.Logger.Error("Can not get routing key from db", err)
return PR_INTERNAL_SERVER_ERROR, err
}
if key != "" {
response = didcomm.Message{
Id: uuid.Must(uuid.NewRandom()).String(),
Type: "https://didcomm.org/coordinate-mediation/3.0/mediate-deny",
Body: "{}",
From: &h.mediator.Did,
}
return response, nil
}
mediatee, err := h.mediator.Database.GetMediatee(*message.From)
if err != nil {
......@@ -394,8 +394,8 @@ func (h *CoordinateMediation) handleMediationRequest(message didcomm.Message, be
}
inv := messaging.InvitationNotify{
RequestId: id,
Did: mediatee.RemoteDid,
InvitationId: id,
Did: mediatee.RoutingKey,
}
err = sendCloudEvent(inv, mediatee)
......
......@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"net/http"
"strings"
"time"
"gitlab.eclipse.org/eclipse/xfsc/common-services/didcomm-connector/didcomm"
......@@ -34,7 +33,7 @@ func (rt *Routing) Handle(message didcomm.Message) (response didcomm.Message, er
switch message.Type {
case PIURI_ROUTING_FORWARD:
response, err = rt.handleForward(message)
response, err = rt.handleForward(message, true)
default:
err = intErr.ErrUnknownMessageType
response = PR_UNKNOWN_MESSAGE_TYPE
......@@ -42,7 +41,7 @@ func (rt *Routing) Handle(message didcomm.Message) (response didcomm.Message, er
return
}
func (rt *Routing) handleForward(message didcomm.Message) (pr ProblemReport, err error) {
func (rt *Routing) handleForward(message didcomm.Message, inbound bool) (pr ProblemReport, err error) {
t := uint64(time.Now().UTC().Unix())
if message.ExpiresTime != nil && *message.ExpiresTime < t {
return PR_EXPIRED_MESSAGE, errors.New("message has expired")
......@@ -57,74 +56,82 @@ func (rt *Routing) handleForward(message didcomm.Message) (pr ProblemReport, err
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
isRegistered, err := rt.mediator.Database.IsRecipientDidRegistered(body.Next)
if err != nil {
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
if len(*message.Attachments) != 1 {
return PR_COULD_NOT_FORWARD_MESSAGE, errors.New("message must have exactly one attachment")
}
attachment := (*message.Attachments)[0]
if isRegistered {
config.Logger.Info("Recipient is registered")
isMediated, err := rt.mediator.Database.IsMediated(body.Next)
if strings.HasPrefix(body.Next, "did:") {
err = rt.mediator.Database.AddMessage(body.Next, attachment)
if err != nil {
config.Logger.Error("could not add message to inbox", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
return PR_COULD_NOT_FORWARD_MESSAGE, err
} else {
messageB64 := attachment.Data.(didcomm.AttachmentDataBase64).Value.Base64
messageDecoded, err := base64.StdEncoding.DecodeString(messageB64)
if err != nil {
config.Logger.Error("Error decoding message", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
mediatee, err := rt.mediator.Database.GetMediateeByRecipientDid(body.Next)
if err != nil {
config.Logger.Error("error getting mediatee", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
if err != nil {
config.Logger.Error("error checking mediatee", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
var content map[string]interface{}
if isMediated {
err = json.Unmarshal(messageDecoded, &content)
err = rt.mediator.Database.AddMessage(body.Next, attachment)
if err != nil {
config.Logger.Error("could not add message to inbox", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
return PR_COULD_NOT_FORWARD_MESSAGE, err
if err != nil {
config.Logger.Error("marshalling body failed", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
} else {
// send message to cPCM (cloud)
err = SendMessage(content, mediatee)
if err != nil {
config.Logger.Error("unable to send message to cloud", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
}
} /*else {
config.Logger.Info("Recipient is not registered")
didDoc, err := rt.mediator.DidResolver.ResolveDid(body.Next)
isRegistered, err := rt.mediator.Database.IsRecipientDidRegistered(body.Next)
if err != nil {
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
if len(didDoc.Service) != 1 {
return PR_COULD_NOT_FORWARD_MESSAGE, errors.New("didDoc must have exactly one service")
}
service := didDoc.Service[0]
serviceEndpoint := service.ServiceEndpoint.(didcomm.ServiceKindDidCommMessaging)
pr, err := rt.ForwardMessage(attachment, body.Next, serviceEndpoint.Value.Uri)
if err != nil {
return pr, err
if isRegistered {
/*here it should be in future a iteration over all recipients, together with a format selection. The forward is not precisly specified in the moment, and should
be discussed deeper. E.g. Recipient forward types, recipient forward version etc. pp For now here is just an simple forward to nats, potentially this could be enhanced by web
socket etc.
*/
config.Logger.Info("Recipient is registered")
if !inbound {
err = rt.mediator.Database.AddMessage(body.Next, attachment)
if err != nil {
config.Logger.Error("could not add message to inbox", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
} else {
//In the case of a did here must be later on a decision if a service endpoint should be used or
//to use the message box. for now is it the messagebox
messageB64 := attachment.Data.(didcomm.AttachmentDataBase64).Value.Base64
messageDecoded, err := base64.StdEncoding.DecodeString(messageB64)
if err != nil {
config.Logger.Error("Error decoding message", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
mediatee, err := rt.mediator.Database.GetMediateeByRecipientDid(body.Next)
if err != nil {
config.Logger.Error("error getting mediatee", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
var content map[string]interface{}
err = json.Unmarshal(messageDecoded, &content)
if err != nil {
config.Logger.Error("marshalling body failed", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
err = SendMessage(content, mediatee)
if err != nil {
config.Logger.Error("unable to send message to cloud", "err", err)
return PR_COULD_NOT_FORWARD_MESSAGE, err
}
}
}
}
*/
return PR_COULD_NOT_FORWARD_MESSAGE, err
return didcomm.Message{}, err
}
func (rt *Routing) ForwardMessage(message didcomm.Attachment, recipientDid string, endpoint string) (pr ProblemReport, err error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment