Skip to content
Snippets Groups Projects

psnc connectors

Merged Raul Palma requested to merge rapw3k/data-discovery-connectors:main into main
4 files
+ 187
0
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 41
0
# Ensure to import only the necessary library for connecting to the storage solution selected
import subprocess
try:
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
except ImportError:
subprocess.run(["pip", "install", "confluent-kafka"])
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
###
# Connect to Kafka based on config_conn params and return metadata_obj
###
def db_conn(schema_registry_url):
metadata_obj = []
try:
print("-> Trying to connect to Kafka...")
# Initialize Kafka client
schema_registry_client = SchemaRegistryClient({"url": schema_registry_url["endpoint"]})
# Retrieve list of subjects
subjects = schema_registry_client.get_subjects()
# Retrieve metadata for each subject
for subject in subjects:
metadata = schema_registry_client.get_latest_version(subject)
metadata_info = {
"subject": subject,
"schema": metadata.schema,
"schema_id": metadata.schema_id,
"version": metadata.version,
}
metadata_obj.append(metadata_info)
print("-> Metadata from Kafka collected.")
except Exception as error:
print("-> Error while connecting to Kafka:", error)
return metadata_obj
Loading