Saturday, August 29, 2020

Structured Streaming Data storage in Hive Table

0 comments

In this post we talk about how you can read data from files using Spark Structured Streaming and store the output in a Hive table



Build a Streaming App

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StringType, StructField, StructType}


object StructuredStreamingSaveToHive {

def main(args: Array[String]): Unit = {

println("Structured Streaming Demo")

val conf = new SparkConf().setAppName("Spark Structured Streaming").setMaster("local[*]")

val spark = SparkSession.builder.config(conf).getOrCreate()

println("Spark Session created")

val schema = StructType(Array(StructField("empId",StringType),StructField("empName",StringType)))

// Create a "inputDir" under the
val streamDF = spark.readStream.option("header","true").schema(schema).csv("C:\\inputDir")

val query = streamDF.writeStream.outputMode(OutputMode.Append()).format("csv")
.option("path","hivelocation").option("checkpointLocation","locatoin1").start()

query.awaitTermination()

}

}


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>FuturexMiscSparkScala</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

fil1.txt

empiId,empName
1,Chris
2,Neil

file2.txt

empiId,empName
3,John
4,Paul

file3.txt

empiId,empName
5,Kathy
6,Ana

You can create a Hive table pointing to the “hivelocation” and see data getting populated incrementally

To Learn more about Spark Scala Coding Framework and Best Practices checkout our Udemy course https://www.udemy.com/course/spark-scala-coding-best-practices-data-pipeline/?referralCode=DBA026944F73C2D356CF

No comments:

Post a Comment