Hello, I'm trying to run the sta/lta python code which I got it from obspy website using spark streaming and plot the events but I keep getting the following error!
"java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute" Here is the code: #!/usr/bin/env python # -*- coding: utf-8 -*- # ------------------------------------------------------------------- # Filename: trigger.py # Purpose: Python trigger/picker routines for seismology. # Author: Moritz Beyreuther, Tobias Megies # Email: moritz.beyreut...@geophysik.uni-muenchen.de # # Copyright (C) 2008-2012 Moritz Beyreuther, Tobias Megies # ------------------------------------------------------------------- from __future__ import (absolute_import, division, print_function, unicode_literals) from future.builtins import * # NOQA from collections import deque import ctypes as C import warnings import numpy as np from obspy import UTCDateTime from obspy.signal.cross_correlation import templates_max_similarity from obspy.signal.headers import clibsignal, head_stalta_t from numpy import genfromtxt import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext def classic_sta_lta_py(a, nsta, nlta): """ Computes the standard STA/LTA from a given input array a. The length of the STA is given by nsta in samples, respectively is the length of the LTA given by nlta in samples. Written in Python. .. note:: There exists a faster version of this trigger wrapped in C called :func:`~obspy.signal.trigger.classic_sta_lta` in this module! :type a: NumPy :class:`~numpy.ndarray` :param a: Seismic Trace :type nsta: int :param nsta: Length of short time average window in samples :type nlta: int :param nlta: Length of long time average window in samples :rtype: NumPy :class:`~numpy.ndarray` :return: Characteristic function of classic STA/LTA """ # The cumulative sum can be exploited to calculate a moving average (the # cumsum function is quite efficient) sta = np.cumsum(a ** 2) # Convert to float sta = np.require(sta, dtype=np.float) # Copy for LTA lta = sta.copy() # Compute the STA and the LTA sta[nsta:] = sta[nsta:] - sta[:-nsta] sta /= nsta lta[nlta:] = lta[nlta:] - lta[:-nlta] lta /= nlta # Pad zeros sta[:nlta - 1] = 0 # Avoid division by zero by setting zero values to tiny float dtiny = np.finfo(0.0).tiny idx = lta < dtiny lta[idx] = dtiny return sta / lta def trigger_onset(charfct, thres1, thres2, max_len=9e99, max_len_delete=False): """ Calculate trigger on and off times. Given thres1 and thres2 calculate trigger on and off times from characteristic function. This method is written in pure Python and gets slow as soon as there are more then 1e6 triggerings ("on" AND "off") in charfct --- normally this does not happen. :type charfct: NumPy :class:`~numpy.ndarray` :param charfct: Characteristic function of e.g. STA/LTA trigger :type thres1: float :param thres1: Value above which trigger (of characteristic function) is activated (higher threshold) :type thres2: float :param thres2: Value below which trigger (of characteristic function) is deactivated (lower threshold) :type max_len: int :param max_len: Maximum length of triggered event in samples. A new event will be triggered as soon as the signal reaches again above thres1. :type max_len_delete: bool :param max_len_delete: Do not write events longer than max_len into report file. :rtype: List :return: Nested List of trigger on and of times in samples """ # 1) find indices of samples greater than threshold # 2) calculate trigger "of" times by the gap in trigger indices # above the threshold i.e. the difference of two following indices # in ind is greater than 1 # 3) in principle the same as for "of" just add one to the index to get # start times, this operation is not supported on the compact # syntax # 4) as long as there is a on time greater than the actual of time find # trigger on states which are greater than last of state an the # corresponding of state which is greater than current on state # 5) if the signal stays above thres2 longer than max_len an event # is triggered and following a new event can be triggered as soon as # the signal is above thres1 ind1 = np.where(charfct > thres1)[0] if len(ind1) == 0: return [] ind2 = np.where(charfct > thres2)[0] # on = deque([ind1[0]]) of = deque([-1]) # determine the indices where charfct falls below off-threshold ind2_ = np.empty_like(ind2, dtype=bool) ind2_[:-1] = np.diff(ind2) > 1 # last occurence is missed by the diff, add it manually ind2_[-1] = True of.extend(ind2[ind2_].tolist()) on.extend(ind1[np.where(np.diff(ind1) > 1)[0] + 1].tolist()) # include last pick if trigger is on or drop it if max_len_delete: # drop it of.extend([1e99]) on.extend([on[-1]]) else: # include it of.extend([ind2[-1]]) # pick = [] while on[-1] > of[0]: while on[0] <= of[0]: on.popleft() while of[0] < on[0]: of.popleft() if of[0] - on[0] > max_len: if max_len_delete: on.popleft() continue of.appendleft(on[0] + max_len) pick.append([on[0], of[0]]) return np.array(pick, dtype=np.int64) def plot_trigger(a, cft, thr_on, thr_off, show=True): """ Plot characteristic function of trigger along with waveform data and trigger On/Off from given thresholds. :type trace: :class:`~obspy.core.trace.Trace` :param trace: waveform data :type cft: :class:`numpy.ndarray` :param cft: characteristic function as returned by a trigger in :mod:`obspy.signal.trigger` :type thr_on: float :param thr_on: threshold for switching trigger on :type thr_off: float :param thr_off: threshold for switching trigger off :type show: bool :param show: Do not call `plt.show()` at end of routine. That way, further modifications can be done to the figure before showing it. """ import matplotlib.pyplot as plt #df = trace.stats.sampling_rate df = 200 #npts = trace.stats.npts npts = 36000 t = np.arange(npts, dtype=np.float32) / df fig = plt.figure() ax1 = fig.add_subplot(211) ax1.plot(t, a, 'k') ax2 = fig.add_subplot(212, sharex=ax1) ax2.plot(t, cft, 'k') on_off = np.array(trigger_onset(cft, thr_on, thr_off)) #on_off = np.array(triggers) i, j = ax1.get_ylim() try: ax1.vlines(on_off[:, 0] / df, i, j, color='r', lw=2, label="Trigger On") ax1.vlines(on_off[:, 1] / df, i, j, color='b', lw=2, label="Trigger Off") ax1.legend() except IndexError: pass ax2.axhline(thr_on, color='red', lw=1, ls='--') ax2.axhline(thr_off, color='blue', lw=1, ls='--') starttime = "2005-10-06T07:21:59.850000Z" #ax2.set_xlabel("Time after %s [s]" % starttime.isoformat()) ax2.set_xlabel("Time after %s" % starttime) fig.suptitle(".RJOB..Z") fig.canvas.draw() if show: plt.show() if __name__ == '__main__': sc = SparkContext("local[18]", "STALTA") ssc = StreamingContext(sc, 5) data = sc.textFile("file:///home/zeinab/spark-2.3.1-bin-hadoop2.7/inputFile.txt") floatelems = data.flatMap(lambda line: line.strip().split("\n")).map(lambda strelem: float(strelem)) stalta_ratio = floatelems.map(lambda i: classic_sta_lta_py(i, 2, 30)) result = floatelems.flatMap(lambda j: plot_trigger(j, stalta_ratio ,4, 2, show=True)) #result.show() ssc.start() ssc.awaitTermination() I also tried this as to call an action to be able to see the plot: result.show() but I got this error: AttributeError: 'PipelinedRDD' object has no attribute 'show' The input text file is available through the link bellow: <a href="https://www.dropbox.com/s/wf2cpdlrbwnip14/inputFile.txt?dl=0<a href="https://www.dropbox.com">https://www.dropbox.com/s/wf2cpdlrbwnip14/inputFile.txt?dl=0<a href="https://www.dropbox.com</a> I'd greatly appreciate your help! Thank you, Zeinab -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org