Problem Scenario 46 : You have been given belwo list in scala (name,sex,cost) for each
work done.
List( ("Deeapak" , "male", 4000), ("Deepak" , "male", 2000), ("Deepika" , "female",
2000),("Deepak" , "female", 2000), ("Deepak" , "male", 1000) , ("Neeta" , "female", 2000))
Now write a Spark program to load this list as an RDD and do the sum of cost for
combination of name and sex (as key)

Answer: See the explanation for Step by Step Solution and configuration.
Solution :
Step 1 : Create an RDD out of this list
val rdd = sc.parallelize(List( ("Deeapak" , "male", 4000}, ("Deepak" , "male", 2000),
("Deepika" , "female", 2000),("Deepak" , "female", 2000), ("Deepak" , "male", 1000} ,
("Neeta" , "female", 2000}}}
Step 2 : Convert this RDD in pair RDD
val byKey ={case (name,sex,cost) => (name,sex)->cost})
Step 3 : Now group by Key
val byKeyGrouped = byKey.groupByKey
Step 4 : Nowsum the cost for each group
val result ={case ((id1,id2),values) => (id1,id2,values.sum)}
Step 5 : Save the results result.repartition(1).saveAsTextFile("spark12/result.txt")

Problem Scenario 41 : You have been given below code snippet.
val aul = sc.parallelize(List (("a" , Array(1,2)), ("b" , Array(1,2))))
val au2 = sc.parallelize(List (("a" , Array(3)), ("b" , Array(2))))
Apply the Spark method, which will generate below output.
Array[(String, Array[lnt])] = Array((a,Array(1, 2)), (b,Array(1, 2)), (a(Array(3)), (b,Array(2)))

Answer: See the explanation for Step by Step Solution and configuration.

Problem Scenario 60 : You have been given below code snippet.
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"}, 3}
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","woif","bear","bee"), 3)
val d = c.keyBy(_.length)
Write a correct code snippet for operationl which will produce desired output, shown below.
Array[(lnt, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)),
(6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)),
(6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)),
(3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

Answer: See the explanation for Step by Step Solution and configuration.
join [Pair]: Performs an inner join using two key-value RDDs. Please note that the keysmust be generally comparable to make this work. keyBy : Constructs two-component tuples
(key-value pairs) by applying a function on each data item. The result of the function
becomes the data item becomes the key and the original value of the newly created tuples.

Problem Scenario 13 : You have been given following mysql database details as well as
other info.
jdbc URL = jdbc:mysql://quickstart:3306/retail_db
Please accomplish following.
1. Create a table in retailedb with following definition.
CREATE table departments_export (department_id int(11), department_name varchar(45),
2. Now import the data from following directory into departments_export table,
/user/cloudera/departments new

Answer: See the explanation for Step by Step Solution and configuration.
Solution :
Step 1 : Login to musql db
mysql -user=retail_dba -password=cloudera
show databases; use retail_db; show tables;
step 2 : Create a table as given in problem statement.
CREATE table departments_export (departmentjd int(11), department_name varchar(45),
created_date T1MESTAMP DEFAULT NOW());
show tables;
Step 3 : Export data from /user/cloudera/departmentsnew to new table departments_export
sqoop export -connect jdbc:mysql://quickstart:3306/retail_db \
-username retaildba \
-password cloudera \
-table departments_export \
-export-dir /user/cloudera/departments_new \
Step 4 : Now check the export is correctly done or not. mysql -user*retail_dba -
show databases;
use retail _db;
show tables;
select' from departments_export;

Problem Scenario 23 : You have been given log generating service as below.
Start_logs (It will generate continuous logs)
Tail_logs (You can check , what logs are being generated)
Stop_logs (It will stop the log service)
Path where logs are generated using above service : /opt/gen_logs/logs/access.log
Now write a flume configuration file named flume3.conf , using that configuration file dumps
logs in HDFS file system in a directory called flumeflume3/%Y/%m/%d/%H/%M
Means every minute new directory should be created). Please us the interceptors to
provide timestamp information, if message header does not have header info.
And also note that you have to preserve existing timestamp, if message contains it. Flume
channel should have following property as well. After every 100 message it should be
committed, use non-durable/faster channel and it should be able to hold maximum 1000

Answer: See the explanation for Step by Step Solution and configuration.
Solution :
Step 1 : Create flume configuration file, with below configuration for source, sink and
#Define source , sink , channel and agent,
agent1 .sources = source1
agent1 .sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1 .sources.source1.type = exec
agentl.sources.source1.command = tail -F /opt/gen logs/logs/access.log
#Define interceptors
agent1 .sources.source1.interceptors=i1
agent1 .sources.source1.interceptors.i1.type=timestamp
agent1 .sources.source1.interceptors.i1.preserveExisting=true
## Describe sink1
agent1 = memory-channel
agent1 .sinks.sink1.type = hdfs
agent1 .sinks.sink1.hdfs.path = flume3/%Y/%m/%d/%H/%M
agent1 .sinks.sjnkl.hdfs.fileType = Data Stream
# Now we need to define channel1 property.
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
# Bind the source and sink to the channel
Agent1.sources.source1.channels = channel1 = channel1
Step 2 : Run below command which will use this configuration file and append data in
Start log service using : start_logs
Start flume service:
flume-ng agent -conf /home/cloudera/flumeconf -conf-file
/home/cloudera/flumeconf/flume3.conf -DfIume.root.logger=DEBUG,INFO,console –name
Wait for few mins and than stop log service.
stop logs

