A Big Data Hadoop and Spark Project for Absolute Beginners
Learn on Udemy
In this post we talk about how you can read data from files using Spark Structured Streaming and store the output in a Hive table
Build a Streaming App
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object StructuredStreamingSaveToHive {
def main(args: Array[String]): Unit = {
println("Structured Streaming Demo")
val conf = new SparkConf().setAppName("Spark Structured Streaming").setMaster("local[*]")
val spark = SparkSession.builder.config(conf).getOrCreate()
println("Spark Session created")
val schema = StructType(Array(StructField("empId",StringType),StructField("empName",StringType)))
// Create a "inputDir" under the
val streamDF = spark.readStream.option("header","true").schema(schema).csv("C:\\inputDir")
val query = streamDF.writeStream.outputMode(OutputMode.Append()).format("csv")
.option("path","hivelocation").option("checkpointLocation","locatoin1").start()
query.awaitTermination()
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FuturexMiscSparkScala</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
- Keep the C:\\inputDir directory initially empty.
- Start the program and it will be waiting to stream.
- Then copy each of the files (fil1, file2, file3 mentioned below) to C:\\inputDir directory one file at a time and see the output in “hivelocation” directory under your project root folder.
fil1.txt
empiId,empName
1,Chris
2,Neil
file2.txt
empiId,empName
3,John
4,Paul
file3.txt
empiId,empName
5,Kathy
6,Ana
You can create a Hive table pointing to the “hivelocation” and see data getting populated incrementally
To Learn more about Spark Scala Coding Framework and Best Practices checkout our Udemy course https://www.udemy.com/course/spark-scala-coding-best-practices-data-pipeline/?referralCode=DBA026944F73C2D356CF