diff --git a/cmd/migrate/migrate.go b/cmd/migrate/migrate.go index e53af7a2ebc37c8d02713f2030f252d4c3b0b50c..61357b3ac376bcb807ca03b9ac360184292165ba 100644 --- a/cmd/migrate/migrate.go +++ b/cmd/migrate/migrate.go @@ -55,16 +55,24 @@ func Run() { for msg := range msgs { processMessage(msg, ch, veleroClient) + log.Println("Awaiting messages...") } } -// processMessage handles incoming messages +// Handles incoming messages func processMessage(msg amqp091.Delivery, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) { + log.Println("Message received:") var migrationMsg rabbitmq.MigrationMsg if err := yaml.Unmarshal(msg.Body, &migrationMsg); err != nil { log.Printf("Error parsing message: %v", err) return } + fmt.Println("Action", migrationMsg.Action) + fmt.Println("SourceCluster", migrationMsg.SourceCluster) + fmt.Println("TargetCluster", migrationMsg.TargetCluster) + fmt.Println("BackupName", migrationMsg.BackupName) + fmt.Println("Timestamp", migrationMsg.Timestamp) + fmt.Println("WorkloadID", migrationMsg.WorkloadID) switch migrationMsg.Action { case actionBackup: @@ -76,12 +84,16 @@ func processMessage(msg amqp091.Delivery, ch *amqp091.Channel, veleroClient *vel } } -// handleBackup processes a backup action +// Processes a backup action func handleBackup(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) { currentTime := time.Now().Format("20060102150405") backupName := fmt.Sprintf("%s-backup-%s", migrationMsg.SourceCluster, currentTime) - backup := velero.CreateBackup(backupName, migrationMsg.WorkloadID, veleroClient) + backup, err := velero.CreateBackup(backupName, migrationMsg.WorkloadID, veleroClient) + if err != nil { + log.Printf("Error creating the backup %v", err) + return + } velero.WaitBackup(backup.Name, veleroClient) log.Println("Backup completed successfully") @@ -91,22 +103,35 @@ func handleBackup(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veler restoreMsg, err := yaml.Marshal(&migrationMsg) if err != nil { - log.Printf("Error marshalling YAML: %v", err) + log.Printf("Error creating restore msg: %v", err) return } rabbitmq.PublishMsg(ch, os.Getenv("MO_EXCHANGE"), migrationMsg.TargetCluster, restoreMsg) } -// handleRestore processes a restore action +// Processes a restore action func handleRestore(migrationMsg rabbitmq.MigrationMsg, ch *amqp091.Channel, veleroClient *veleroClientset.Clientset) { - velero.WaitBackup(migrationMsg.BackupName, veleroClient) + err := velero.WaitBackup(migrationMsg.BackupName, veleroClient) + if err != nil { + log.Printf("Error syncing backup: %v", err) + return + } currentTime := time.Now().Format("20060102150405") restoreName := fmt.Sprintf("%s-restore-%s", migrationMsg.TargetCluster, currentTime) - restore := velero.CreateRestore(migrationMsg.BackupName, restoreName, veleroClient) - velero.WaitRestore(restore.Name, veleroClient) + restore, err := velero.CreateRestore(migrationMsg.BackupName, restoreName, veleroClient) + if err != nil { + log.Printf("Error creating restore: %v", err) + return + } + + err = velero.WaitRestore(restore.Name, veleroClient) + if err != nil { + log.Printf("Error syncing restore: %v", err) + return + } log.Println("Migration completed successfully") diff --git a/pkg/velero/velero.go b/pkg/velero/velero.go index fd9b1d1d5dbce27af5a08a6bd9c96d680b20909a..f3cc1786b73b0169ab8ec4612873491a886d7dcc 100644 --- a/pkg/velero/velero.go +++ b/pkg/velero/velero.go @@ -20,8 +20,6 @@ package velero import ( "context" "fmt" - "log" - "os" "time" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" @@ -34,43 +32,41 @@ const ( pollingInterval = 20 ) -func WaitBackup(backupName string, veleroClientset *veleroClientset.Clientset) { +func WaitBackup(backupName string, veleroClientset *veleroClientset.Clientset) error { startTime := time.Now() for { backup, _ := veleroClientset.VeleroV1().Backups("nemo-kernel").Get(context.TODO(), backupName, metav1.GetOptions{}) if backup.Status.Phase == "Completed" { - break + return nil } else { fmt.Println("Backup is still in progress. Waiting...") time.Sleep(pollingInterval * time.Second) currentTime := time.Now() if currentTime.Sub(startTime) > time.Second*time.Duration(timeoutSeconds) { - fmt.Println("Timeout reached. Backup did not complete within the specified time.") - os.Exit(1) + return fmt.Errorf("Timeout reached. Backup did not complete within the specified time (%ds).", timeoutSeconds) } } } } -func WaitRestore(restoreName string, veleroClientset *veleroClientset.Clientset) { +func WaitRestore(restoreName string, veleroClientset *veleroClientset.Clientset) error { startTime := time.Now() for { restore, _ := veleroClientset.VeleroV1().Restores("nemo-kernel").Get(context.TODO(), restoreName, metav1.GetOptions{}) if restore.Status.Phase == "Completed" { - break + return nil } else { fmt.Println("Restore is still in progress. Waiting...") time.Sleep(pollingInterval * time.Second) currentTime := time.Now() if currentTime.Sub(startTime) > time.Second*time.Duration(timeoutSeconds) { - fmt.Println("Timeout reached. Restore did not complete within the specified time.") - os.Exit(1) + return fmt.Errorf("Timeout reached. Restore did not complete within the specified time (%ds).", timeoutSeconds) } } } } -func CreateBackup(backupName string, workload string, veleroClientset *veleroClientset.Clientset) *velerov1.Backup { +func CreateBackup(backupName string, workload string, veleroClientset *veleroClientset.Clientset) (*velerov1.Backup, error) { // Define the backup object backup := &velerov1.Backup{ ObjectMeta: metav1.ObjectMeta{ @@ -86,13 +82,10 @@ func CreateBackup(backupName string, workload string, veleroClientset *veleroCli } // Create the backup createdBackup, err := veleroClientset.VeleroV1().Backups("nemo-kernel").Create(context.Background(), backup, metav1.CreateOptions{}) - if err != nil { - panic(err) - } - return createdBackup + return createdBackup, err } -func CreateRestore(backupName string, restoreName string, veleroClientset *veleroClientset.Clientset) *velerov1.Restore { +func CreateRestore(backupName string, restoreName string, veleroClientset *veleroClientset.Clientset) (*velerov1.Restore, error) { // Define the restore object restore := &velerov1.Restore{ ObjectMeta: metav1.ObjectMeta{ @@ -105,8 +98,7 @@ func CreateRestore(backupName string, restoreName string, veleroClientset *veler // Apply the restore object to the cluster createdRestore, err := veleroClientset.VeleroV1().Restores("nemo-kernel").Create(context.Background(), restore, metav1.CreateOptions{}) if err != nil { - log.Printf("Error creating restore: %v", err) - return nil + return nil, err } - return createdRestore + return createdRestore, nil }