Hello,
I am running a spark streaming program on a dataset which is a sequence of
numbers in a text file format. I read the text file and load it into a Kafka
topic and then run the Spark streaming program on the DStream and finally
write the result into an output text file. But I'm getting almost totally
different result compared to run the program without Spark streaming.
I'm using maPartitions and it seems it shuffles the data and messes it up.
Here is my code in Spark streaming and using Kafka:
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import pandas as pd
#---------------
def classic_sta_lta_py(a):
nsta = 2
nlta = 30
#
print("a=", a)
sta = np.cumsum(a ** 2)
print("sta1=", sta)
# sta = np.cumsum(a * a, dtype=float)
# print("{}. sta array is: ".format(sta))
# Convert to float
sta = np.require(sta, dtype=np.float)
print("sta2=", sta)
# Copy for LTA
lta = sta.copy()
print("lta=", lta)
# Compute the STA and the LTA
sta[nsta:] = sta[nsta:] - sta[:-nsta]
sta /= nsta
lta[nlta:] = lta[nlta:] - lta[:-nlta]
lta /= nlta
# Pad zeros
sta[:nlta - 1] = 0
# Avoid division by zero by setting zero values to tiny float
dtiny = np.finfo(0.0).tiny
idx = lta < dtiny
lta[idx] = dtiny
return sta / lta
#---------------
def trigger_onset(charfct):
"""
Calculate trigger on and off times.
Given thres1 and thres2 calculate trigger on and off times from
characteristic function.
This method is written in pure Python and gets slow as soon as there
are more then 1e6 triggerings ("on" AND "off") in charfct --- normally
this does not happen.
:type charfct: NumPy :class:`~numpy.ndarray`
:param charfct: Characteristic function of e.g. STA/LTA trigger
:type thres1: float
:param thres1: Value above which trigger (of characteristic function)
is activated (higher threshold)
:type thres2: float
:param thres2: Value below which trigger (of characteristic function)
is deactivated (lower threshold)
:type max_len: int
:param max_len: Maximum length of triggered event in samples. A new
event will be triggered as soon as the signal reaches
again above thres1.
:type max_len_delete: bool
:param max_len_delete: Do not write events longer than max_len into
report file.
:rtype: List
:return: Nested List of trigger on and of times in samples
"""
# 1) find indices of samples greater than threshold
# 2) calculate trigger "of" times by the gap in trigger indices
# above the threshold i.e. the difference of two following indices
# in ind is greater than 1
# 3) in principle the same as for "of" just add one to the index to get
# start times, this operation is not supported on the compact
# syntax
# 4) as long as there is a on time greater than the actual of time find
# trigger on states which are greater than last of state an the
# corresponding of state which is greater than current on state
# 5) if the signal stays above thres2 longer than max_len an event
# is triggered and following a new event can be triggered as soon as
# the signal is above thres1
thres1 = 4
thres2 = 2
max_len = 9e99
max_len_delete = False
#charfct = []
# for x in iterator:
# print(x)
# charfct.append(x)
ind1 = np.where(charfct > thres1)[0]
if len(ind1) == 0:
return []
ind2 = np.where(charfct > thres2)[0]
#
on = deque([ind1[0]])
of = deque([-1])
# determine the indices where charfct falls below off-threshold
ind2_ = np.empty_like(ind2, dtype=bool)
ind2_[:-1] = np.diff(ind2) > 1
# last occurence is missed by the diff, add it manually
ind2_[-1] = True
of.extend(ind2[ind2_].tolist())
on.extend(ind1[np.where(np.diff(ind1) > 1)[0] + 1].tolist())
# include last pick if trigger is on or drop it
if max_len_delete:
# drop it
of.extend([1e99])
on.extend([on[-1]])
else:
# include it
of.extend([ind2[-1]])
#
pick = []
while on[-1] > of[0]:
while on[0] <= of[0]:
on.popleft()
while of[0] < on[0]:
of.popleft()
if of[0] - on[0] > max_len:
if max_len_delete:
on.popleft()
continue
of.appendleft(on[0] + max_len)
pick.append([on[0], of[0]])
return np.array(pick, dtype=np.int64)
# #---------------
def saveRec(rdd):
rdd.foreach(lambda rec:
open("/Users/zeinab/kafka_2.11-1.1.0/outputFile4.txt", "a").write(rec+"\n"))
if __name__ == "__main__":
print("hello spark")
sc = SparkContext(appName="STALTA")
ssc = StreamingContext(sc, 5)
broker, topic = sys.argv[1:]
# Connect to Kafka
kvs = KafkaUtils.createStream(ssc, broker,
"raw-event-streaming-consumer",{topic:1})
lines = kvs.map(lambda x: x[1])
ds = lines.flatMap(lambda line: line.strip().split("\n")).map(lambda
strelem: float(strelem))
mapped = ds.mapPartitions(lambda i:
classic_sta_lta_py(np.array(list(i))))
mapped1 = mapped.mapPartitions(lambda j:
trigger_onset(np.array(list(j))))
lines2 = mapped1.map(lambda y: y)
mapped2 = lines2.map(lambda w: str(w))
mapped2.count().pprint()
mapped2.foreachRDD(saveRec)
ssc.start()
ssc.awaitTermination()
And here is my code without Spark streaming:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------
# Filename: trigger.py
# Purpose: Python trigger/picker routines for seismology.
# Author: Moritz Beyreuther, Tobias Megies
# Email: [email protected]
#
# Copyright (C) 2008-2012 Moritz Beyreuther, Tobias Megies
# -------------------------------------------------------------------
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
from collections import deque
import ctypes as C
import warnings
import numpy as np
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy.signal.headers import clibsignal, head_stalta_t
from numpy import genfromtxt
from obspy import read
def classic_sta_lta_py(a, nsta, nlta):
"""
Computes the standard STA/LTA from a given input array a. The length of
the STA is given by nsta in samples, respectively is the length of the
LTA given by nlta in samples. Written in Python.
.. note::
There exists a faster version of this trigger wrapped in C
called :func:`~obspy.signal.trigger.classic_sta_lta` in this module!
:type a: NumPy :class:`~numpy.ndarray`
:param a: Seismic Trace
:type nsta: int
:param nsta: Length of short time average window in samples
:type nlta: int
:param nlta: Length of long time average window in samples
:rtype: NumPy :class:`~numpy.ndarray`
:return: Characteristic function of classic STA/LTA
"""
# The cumulative sum can be exploited to calculate a moving average (the
# cumsum function is quite efficient)
sta = np.cumsum(a ** 2)
# Convert to float
sta = np.require(sta, dtype=np.float)
# Copy for LTA
lta = sta.copy()
# Compute the STA and the LTA
sta[nsta:] = sta[nsta:] - sta[:-nsta]
sta /= nsta
lta[nlta:] = lta[nlta:] - lta[:-nlta]
lta /= nlta
# Pad zeros
sta[:nlta - 1] = 0
# Avoid division by zero by setting zero values to tiny float
dtiny = np.finfo(0.0).tiny
idx = lta < dtiny
lta[idx] = dtiny
return sta / lta
def trigger_onset(charfct, thres1, thres2, max_len=9e99,
max_len_delete=False):
"""
Calculate trigger on and off times.
Given thres1 and thres2 calculate trigger on and off times from
characteristic function.
This method is written in pure Python and gets slow as soon as there
are more then 1e6 triggerings ("on" AND "off") in charfct --- normally
this does not happen.
:type charfct: NumPy :class:`~numpy.ndarray`
:param charfct: Characteristic function of e.g. STA/LTA trigger
:type thres1: float
:param thres1: Value above which trigger (of characteristic function)
is activated (higher threshold)
:type thres2: float
:param thres2: Value below which trigger (of characteristic function)
is deactivated (lower threshold)
:type max_len: int
:param max_len: Maximum length of triggered event in samples. A new
event will be triggered as soon as the signal reaches
again above thres1.
:type max_len_delete: bool
:param max_len_delete: Do not write events longer than max_len into
report file.
:rtype: List
:return: Nested List of trigger on and of times in samples
"""
# 1) find indices of samples greater than threshold
# 2) calculate trigger "of" times by the gap in trigger indices
# above the threshold i.e. the difference of two following indices
# in ind is greater than 1
# 3) in principle the same as for "of" just add one to the index to get
# start times, this operation is not supported on the compact
# syntax
# 4) as long as there is a on time greater than the actual of time find
# trigger on states which are greater than last of state an the
# corresponding of state which is greater than current on state
# 5) if the signal stays above thres2 longer than max_len an event
# is triggered and following a new event can be triggered as soon as
# the signal is above thres1
ind1 = np.where(charfct > thres1)[0]
if len(ind1) == 0:
return []
ind2 = np.where(charfct > thres2)[0]
#
on = deque([ind1[0]])
of = deque([-1])
# determine the indices where charfct falls below off-threshold
ind2_ = np.empty_like(ind2, dtype=bool)
ind2_[:-1] = np.diff(ind2) > 1
# last occurence is missed by the diff, add it manually
ind2_[-1] = True
of.extend(ind2[ind2_].tolist())
on.extend(ind1[np.where(np.diff(ind1) > 1)[0] + 1].tolist())
# include last pick if trigger is on or drop it
if max_len_delete:
# drop it
of.extend([1e99])
on.extend([on[-1]])
else:
# include it
of.extend([ind2[-1]])
#
pick = []
while on[-1] > of[0]:
while on[0] <= of[0]:
on.popleft()
while of[0] < on[0]:
of.popleft()
if of[0] - on[0] > max_len:
if max_len_delete:
on.popleft()
continue
of.appendleft(on[0] + max_len)
pick.append([on[0], of[0]])
return np.array(pick, dtype=np.int64)
def main():
text_file = open("/home/zeinab/Desktop/STALTA/outFile.txt", "r")
lines = text_file.read().strip().split("\n")
linestofloat = []
for l in lines:
linestofloat.append(float(l))
linestofloat = np.array(linestofloat)
charfct = classic_sta_lta_py(linestofloat, 2, 20)
triggers = trigger_onset(charfct, 4, 2, max_len=9e99,
max_len_delete=False)
for elem1 in triggers:
print(elem1)
print(len(triggers))
if __name__ == '__main__':
main()
import doctest
doctest.testmod(exclude_empty=True)
And here the input text file(dataset):
https://www.dropbox.com/s/wf2cpdlrbwnip14/inputFile.txt?dl=0
<https://www.dropbox.com/s/wf2cpdlrbwnip14/inputFile.txt?dl=0>
some useful commands to run the program with Spark streaming:
Command to load the input text file into a Kafka topic:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test <
/Users/zeinab/kafka_2.11-1.1.0/bin/inputFile.txt
Command to run the program with Spark streaming:
bin/spark-submit --jars
jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar
examples/src/main/python/streaming/z1kafka.py localhost:2181 test
Command to run the program without Spark streaming printing out the result
in console:
python z1.py
If you have any idea that why i am getting different result with and without
Spark please let me know.
Thank you,
Zeinab
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]