Saturday, August 29, 2020

Structured Streaming Data storage in Hive Table

0 comments





A Big Data Hadoop and Spark Project for Absolute Beginners

Learn on Udemy



In this post we talk about how you can read data from files using Spark Structured Streaming and store the output in a Hive table



Build a Streaming App

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StringType, StructField, StructType}


object StructuredStreamingSaveToHive {

def main(args: Array[String]): Unit = {

println("Structured Streaming Demo")

val conf = new SparkConf().setAppName("Spark Structured Streaming").setMaster("local[*]")

val spark = SparkSession.builder.config(conf).getOrCreate()

println("Spark Session created")

val schema = StructType(Array(StructField("empId",StringType),StructField("empName",StringType)))

// Create a "inputDir" under the
val streamDF = spark.readStream.option("header","true").schema(schema).csv("C:\\inputDir")

val query = streamDF.writeStream.outputMode(OutputMode.Append()).format("csv")
.option("path","hivelocation").option("checkpointLocation","locatoin1").start()

query.awaitTermination()

}

}


pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>FuturexMiscSparkScala</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

fil1.txt

empiId,empName
1,Chris
2,Neil

file2.txt

empiId,empName
3,John
4,Paul

file3.txt

empiId,empName
5,Kathy
6,Ana

You can create a Hive table pointing to the “hivelocation” and see data getting populated incrementally

To Learn more about Spark Scala Coding Framework and Best Practices checkout our Udemy course https://www.udemy.com/course/spark-scala-coding-best-practices-data-pipeline/?referralCode=DBA026944F73C2D356CF



A Big Data Hadoop and Spark Project for Absolute Beginners

Learn on Udemy


No comments:

Post a Comment