Skip to main content

Databricks Example

Because Databricks functions as an external system, you can develop their own business logic using Spark within it. In this context, Macrometa serves as both a source (for taking inputs) and a target (for persisting outputs) from the perspective of Databricks.

This example shows you how to read data from a Macrometa stream and write data to a Macrometa collection using the Macrometa Databricks Client Connector.

Read from a Macrometa Stream

  1. Set up your source options:

    val sourceOptions = Map(
    "regionUrl" -> regionUrl,
    "port" -> "6651",
    "apikey" -> apikey,
    "fabric" -> fabric,
    "tenant" -> tenant,
    "replication" -> replication,
    "stream" -> sourceStream,
    "subscriptionName" -> sourceSubscription
    )
  2. Create a Spark session:

    val spark = SparkSession.builder()
    .appName("MacrometaCollectionStreamingApp")
    .master("local[*]")
    .getOrCreate()
  3. Read from the Macrometa stream:

    val inputStream = spark.readStream
    .format("com.macrometa.spark.stream.MacrometaTableProvider")
    .options(sourceOptions)
    .load()

Write to a Macrometa Collection

  1. Set up your target options:

    val targetOptions = Map(
    "regionUrl" -> regionUrl,
    "apiKey" -> "apikey ",
    "fabric" -> fabric,
    "collection" -> "<YOUR_TARGET_COLLECTION>",
    "batchSize" -> "100"
    )
  2. Write to the Macrometa collection:

  val query = inputStream.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("com.macrometa.spark.collection.MacrometaTableProvider")
.options(targetOptions)
.mode(SaveMode.Append)
.save()
}
.option("checkpointLocation", "checkpoint")
.start()
  1. Close the Spark session:
query.awaitTermination()