-
Ignacio Prusiel Mariscal authoredIgnacio Prusiel Mariscal authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
migrate.go 4.90 KiB
/*******************************************************************************
* Copyright 2023 Bull SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
******************************************************************************/
package migrate
import (
"fmt"
"log"
"os"
"time"
"github.com/rabbitmq/amqp091-go"
veleroClientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
"gopkg.in/yaml.v3"
"gitlab.eclipse.org/eclipse-research-labs/nemo-project/nemo-kernel/intent-based-migration-controller/ibmc-agent/pkg/velero"
"gitlab.eclipse.org/eclipse-research-labs/nemo-project/nemo-kernel/meta-orchestrator/deployment-controller/pkg/k8s"
"gitlab.eclipse.org/eclipse-research-labs/nemo-project/nemo-kernel/meta-orchestrator/deployment-controller/pkg/rabbitmq"
)
const (
actionBackup = "backup"
actionRestore = "restore"
actionMigration = "migration"
)
// Run starts the migration process
func Run() {
// Set up variables
clientset := k8s.Buildk8sClientset()
veleroClient := k8s.BuildVeleroClientset()
clusterName := k8s.GetSpokeClusterName(clientset)
// Set up RabbitMQ connection
conn, ch := rabbitmq.SetupRabbitMQ()
defer conn.Close()
defer ch.Close()
msgs := rabbitmq.SetupQueue(ch, os.Getenv("MO_EXCHANGE"), clusterName)
log.Println("Awaiting messages...")
for msg := range msgs {
processMessage(msg, ch, veleroClient)
log.Println("Awaiting messages...")
}
}
// Handles incoming messages
func processMessage(msg amqp091.Delivery, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) {
log.Println("Message received:")
fmt.Println(" Msg Body: ", msg.Body)
var migrationMsg rabbitmq.MigrationMsg
if err := yaml.Unmarshal(msg.Body, &migrationMsg); err != nil {
log.Printf("Error parsing message: %v", err)
return
}
fmt.Println(" WorkloadID: ", migrationMsg.WorkloadID)
fmt.Println(" Action: ", migrationMsg.Action)
fmt.Println(" SourceCluster: ", migrationMsg.SourceCluster)
fmt.Println(" TargetCluster: ", migrationMsg.TargetCluster)
switch migrationMsg.Action {
case actionBackup:
handleBackup(migrationMsg, ch, veleroClient)
case actionRestore:
handleRestore(migrationMsg, ch, veleroClient)
default:
log.Println("Invalid or missing 'Action' field in message")
}
}
// Processes a backup action
func handleBackup(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) {
currentTime := time.Now().Format("20060102150405")
backupName := fmt.Sprintf("%s-backup-%s", migrationMsg.SourceCluster, currentTime)
backup, err := velero.CreateBackup(backupName, migrationMsg.WorkloadID, veleroClient)
if err != nil {
log.Printf("Error creating the backup %v", err)
return
}
velero.WaitBackup(backup.Name, veleroClient)
log.Println("Backup completed successfully")
migrationMsg.Action = actionRestore
migrationMsg.BackupName = backup.Name
restoreMsg, err := yaml.Marshal(&migrationMsg)
if err != nil {
log.Printf("Error creating restore msg: %v", err)
return
}
rabbitmq.PublishMsg(ch, os.Getenv("MO_EXCHANGE"), migrationMsg.TargetCluster, restoreMsg)
}
// Processes a restore action
func handleRestore(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) {
err := velero.WaitBackup(migrationMsg.BackupName, veleroClient)
if err != nil {
log.Printf("Error syncing backup: %v", err)
return
}
currentTime := time.Now().Format("20060102150405")
restoreName := fmt.Sprintf("%s-restore-%s", migrationMsg.TargetCluster, currentTime)
restore, err := velero.CreateRestore(migrationMsg.BackupName, restoreName, veleroClient)
if err != nil {
log.Printf("Error creating restore: %v", err)
return
}
err = velero.WaitRestore(restore.Name, veleroClient)
if err != nil {
log.Printf("Error syncing restore: %v", err)
return
}
log.Println("Migration completed successfully")
workloadInfo := rabbitmq.WorkloadInfo{
WorkloadID: migrationMsg.WorkloadID,
Type: "migration",
SourceCluster: migrationMsg.SourceCluster,
TargetCluster: migrationMsg.TargetCluster,
Timestamp: currentTime,
}
intentApiExchange := os.Getenv("INTENTAPI_EXCHANGE")
intentApiUpdateRoutingKey := os.Getenv("INTENTAPI_UPDATE_RK")
intentApiNotifyRoutingKey := os.Getenv("INTENTAPI_NOTIFY_RK")
rabbitmq.PublishMsg(ch, intentApiExchange, intentApiUpdateRoutingKey, workloadInfo)
rabbitmq.PublishMsg(ch, intentApiExchange, intentApiNotifyRoutingKey, workloadInfo)
}