Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
migrate.go 4.90 KiB
/*******************************************************************************
 * Copyright 2023 Bull SAS
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 * SPDX-License-Identifier: Apache-2.0
 ******************************************************************************/

package migrate

import (
	"fmt"
	"log"
	"os"
	"time"

	"github.com/rabbitmq/amqp091-go"
	veleroClientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
	"gopkg.in/yaml.v3"

	"gitlab.eclipse.org/eclipse-research-labs/nemo-project/nemo-kernel/intent-based-migration-controller/ibmc-agent/pkg/velero"
	"gitlab.eclipse.org/eclipse-research-labs/nemo-project/nemo-kernel/meta-orchestrator/deployment-controller/pkg/k8s"
	"gitlab.eclipse.org/eclipse-research-labs/nemo-project/nemo-kernel/meta-orchestrator/deployment-controller/pkg/rabbitmq"
)

const (
	actionBackup    = "backup"
	actionRestore   = "restore"
	actionMigration = "migration"
)

// Run starts the migration process
func Run() {
	// Set up variables
	clientset := k8s.Buildk8sClientset()
	veleroClient := k8s.BuildVeleroClientset()
	clusterName := k8s.GetSpokeClusterName(clientset)

	// Set up RabbitMQ connection
	conn, ch := rabbitmq.SetupRabbitMQ()
	defer conn.Close()
	defer ch.Close()

	msgs := rabbitmq.SetupQueue(ch, os.Getenv("MO_EXCHANGE"), clusterName)
	log.Println("Awaiting messages...")

	for msg := range msgs {
		processMessage(msg, ch, veleroClient)
		log.Println("Awaiting messages...")
	}
}

// Handles incoming messages
func processMessage(msg amqp091.Delivery, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) {
	log.Println("Message received:")
	fmt.Println("	Msg Body: ", msg.Body)
	var migrationMsg rabbitmq.MigrationMsg
	if err := yaml.Unmarshal(msg.Body, &migrationMsg); err != nil {
		log.Printf("Error parsing message: %v", err)
		return
	}

	fmt.Println("	WorkloadID: ", migrationMsg.WorkloadID)
	fmt.Println("	Action: ", migrationMsg.Action)
	fmt.Println("	SourceCluster: ", migrationMsg.SourceCluster)
	fmt.Println("	TargetCluster: ", migrationMsg.TargetCluster)

	switch migrationMsg.Action {
	case actionBackup:
		handleBackup(migrationMsg, ch, veleroClient)
	case actionRestore:
		handleRestore(migrationMsg, ch, veleroClient)
	default:
		log.Println("Invalid or missing 'Action' field in message")
	}
}

// Processes a backup action
func handleBackup(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) {
	currentTime := time.Now().Format("20060102150405")
	backupName := fmt.Sprintf("%s-backup-%s", migrationMsg.SourceCluster, currentTime)

	backup, err := velero.CreateBackup(backupName, migrationMsg.WorkloadID, veleroClient)
	if err != nil {
		log.Printf("Error creating the backup %v", err)
		return
	}
	velero.WaitBackup(backup.Name, veleroClient)

	log.Println("Backup completed successfully")

	migrationMsg.Action = actionRestore
	migrationMsg.BackupName = backup.Name

	restoreMsg, err := yaml.Marshal(&migrationMsg)
	if err != nil {
		log.Printf("Error creating restore msg: %v", err)
		return
	}

	rabbitmq.PublishMsg(ch, os.Getenv("MO_EXCHANGE"), migrationMsg.TargetCluster, restoreMsg)
}

// Processes a restore action
func handleRestore(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) {
	err := velero.WaitBackup(migrationMsg.BackupName, veleroClient)
	if err != nil {
		log.Printf("Error syncing backup: %v", err)
		return
	}

	currentTime := time.Now().Format("20060102150405")
	restoreName := fmt.Sprintf("%s-restore-%s", migrationMsg.TargetCluster, currentTime)

	restore, err := velero.CreateRestore(migrationMsg.BackupName, restoreName, veleroClient)
	if err != nil {
		log.Printf("Error creating restore: %v", err)
		return
	}

	err = velero.WaitRestore(restore.Name, veleroClient)
	if err != nil {
		log.Printf("Error syncing restore: %v", err)
		return
	}

	log.Println("Migration completed successfully")

	workloadInfo := rabbitmq.WorkloadInfo{
		WorkloadID:    migrationMsg.WorkloadID,
		Type:          "migration",
		SourceCluster: migrationMsg.SourceCluster,
		TargetCluster: migrationMsg.TargetCluster,
		Timestamp:     currentTime,
	}

	intentApiExchange := os.Getenv("INTENTAPI_EXCHANGE")
	intentApiUpdateRoutingKey := os.Getenv("INTENTAPI_UPDATE_RK")
	intentApiNotifyRoutingKey := os.Getenv("INTENTAPI_NOTIFY_RK")

	rabbitmq.PublishMsg(ch, intentApiExchange, intentApiUpdateRoutingKey, workloadInfo)
	rabbitmq.PublishMsg(ch, intentApiExchange, intentApiNotifyRoutingKey, workloadInfo)
}