home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- '''
- This module contains operations to convert sound files to WAV and to
- retrieve a their metadata.
- '''
- if __name__ == '__main__':
-
- try:
- import pygst
- pygst.require('0.10')
- except ImportError:
- pass
- except:
- None<EXCEPTION MATCH>ImportError
-
-
- None<EXCEPTION MATCH>ImportError
- import threading
- import gst
- import gobject
- import operations
- GVFS_SRC = 'gnomevfssrc'
- FILE_SRC = 'filesrc'
-
- class ElementNotFoundError(KeyError):
- '''This error is thrown when an element is not found'''
- pass
-
- if gst.gst_version < (0, 9):
-
- def safe_element_factory_make(*args, **kwargs):
- element = gst.element_factory_make(*args, **kwargs)
- if element is None:
- raise ElementNotFoundError(args)
-
- return element
-
- else:
- safe_element_factory_make = gst.element_factory_make
-
- class GstPlayingFailledError(StandardError):
- """This error is thrown when we can't set the state to PLAYING"""
- pass
-
-
- class GstOperationListener(operations.OperationListener):
-
- def on_tag(self, event, tag):
- '''Called when a tag is found'''
- pass
-
-
- def on_eos(self, event):
- '''Called when the pipeline reaches EOS.'''
- pass
-
-
- def on_error(self, event, error):
- '''Called when an error occurs'''
- pass
-
-
-
- class GstPipelineOperation(operations.MeasurableOperation):
- '''GStreamer pipeline operation'''
- can_start = property((lambda self: self._GstPipelineOperation__can_start))
- running = property((lambda self: self._GstPipelineOperation__running))
- bin = property((lambda self: self._GstPipelineOperation__bin))
- query_element = None
- progress = property((lambda self: self._get_progress()))
-
- def __init__(self, query_element, pipeline):
- super(GstPipelineOperation, self).__init__()
- self.query_element = query_element
- self._GstPipelineOperation__bin = pipeline
- self._GstPipelineOperation__progress = 0.0
- self._GstPipelineOperation__can_start = True
- self._GstPipelineOperation__running = False
- self._GstPipelineOperation__duration = 0
-
-
- def start(self):
- if not self.can_start:
- raise AssertionError
- if self.bin.set_state(gst.STATE_PLAYING):
- self._GstPipelineOperation__can_start = False
- self._GstPipelineOperation__running = True
- else:
- raise GstPlayingFailledError()
-
-
- def stop(self):
- self._finalize(operations.ABORTED)
-
-
- def query_duration(self, format = gst.FORMAT_BYTES):
- '''Return the total duration'''
- return 0.0
-
-
- def query_position(self, format = gst.FORMAT_BYTES):
- '''Return the current position'''
- return 0
-
-
- def _finalize(self, event_id, error = None):
- if self._GstPipelineOperation__running:
- self.bin.set_state(gst.STATE_NULL)
- self._GstPipelineOperation__running = False
- self._send_finished_event(event_id, error)
-
-
-
- def _get_progress(self):
- if self.query_element and self._GstPipelineOperation__progress < 1:
- if not self._GstPipelineOperation__duration:
- self._GstPipelineOperation__duration = self.query_duration()
-
- if self._GstPipelineOperation__duration == 0:
- progress = 0
- else:
- position = self.query_position()
- progress = self.query_position() / self._GstPipelineOperation__duration
- self._GstPipelineOperation__progress = max(self._GstPipelineOperation__progress, progress)
- if self._GstPipelineOperation__progress <= self._GstPipelineOperation__progress:
- pass
- elif not self._GstPipelineOperation__progress <= 1:
- raise AssertionError, self._GstPipelineOperation__progress
-
- return self._GstPipelineOperation__progress
-
-
- def _on_eos(self):
- event = operations.Event(self)
- self._notify('on_eos', event)
- self._finalize(operations.SUCCESSFUL)
-
-
- def _on_error(self, error):
- event = operations.Event(self)
- self._notify('on_error', event, error)
- self._finalize(operations.ERROR, error)
-
-
- def _on_tag(self, taglist):
- event = operations.Event(self)
- self._notify('on_tag', event, taglist)
-
-
-
- class Gst08Operation(GstPipelineOperation):
- '''Implement GstPipelineOperation with gstreamer 0.8 API'''
-
- def __init__(self, query_element = None, pipeline = None, use_threads = False):
- if pipeline is None:
- if not use_threads or gst.Thread():
- pass
- pipeline = gst.Pipeline()
-
- super(Gst08Operation, self).__init__(query_element, pipeline)
- self._Gst08Operation__use_threads = use_threads
- pipeline.connect('found-tag', self._on_tag)
- pipeline.connect('error', self._on_error)
- pipeline.connect('eos', self._on_eos)
- self._Gst08Operation__source = None
-
-
- def start(self):
- super(Gst08Operation, self).start()
- if self.running and not (self._Gst08Operation__use_threads):
- self._Gst08Operation__source = gobject.idle_add(self.bin.iterate)
-
-
-
- def stop(self):
- if self._Gst08Operation__source is not None:
- gobject.source_remove(self._Gst08Operation__source)
- self._Gst08Operation__source = None
-
- super(Gst08Operation, self).stop()
-
-
- def query_duration(self, format = gst.FORMAT_TIME):
- return float(self.query_element.query(gst.QUERY_TOTAL, format))
-
-
- def query_position(self, format = gst.FORMAT_TIME):
- return self.query_element.query(gst.QUERY_POSITION, format)
-
-
- def _on_error(self, pipeline, element, error, user_data = None):
- super(Gst08Operation, self)._on_error(error)
-
-
- def _on_tag(self, pipeline, element, taglist):
- super(Gst08Operation, self)._on_tag(taglist)
-
-
- def _on_eos(self, pipeline):
- super(Gst08Operation, self)._on_eos()
-
-
- def _finalize(self, event_id, error = None):
- super(Gst08Operation, self)._finalize(event_id, error)
- if self._Gst08Operation__source is not None:
- gobject.source_remove(self._Gst08Operation__source)
- self._Gst08Operation__source = None
-
-
-
-
- class Gst09Operation(GstPipelineOperation):
- '''Implement GstPipelineOperation with gstreamer 0.9/0.10 API'''
- running = property((lambda self: self._Gst09Operation__running))
-
- def __init__(self, query_element = None, pipeline = None):
- if pipeline is None:
- pipeline = gst.Pipeline()
-
- super(Gst09Operation, self).__init__(query_element, pipeline)
- self.bus = pipeline.get_bus()
- self.bus.add_watch(self._dispatch_bus_message)
- self._Gst09Operation__running = False
- self.lock = threading.RLock()
-
-
- def query_duration(self, format = gst.FORMAT_TIME):
-
- try:
- (total, format) = self.query_element.query_duration(format)
- return float(total)
- except gst.QueryError:
- err = None
- return 0.0
-
-
-
- def query_position(self, format = gst.FORMAT_TIME):
-
- try:
- (pos, format) = self.query_element.query_position(format)
- return pos
- except gst.QueryError:
- return 0
-
-
-
- def start(self):
- self.lock.acquire()
-
- try:
- if not self._Gst09Operation__running:
- self._Gst09Operation__running = True
- super(Gst09Operation, self).start()
- finally:
- self.lock.release()
-
-
-
- def _dispatch_bus_message(self, bus, message):
- handler = getattr(self, '_on_' + message.type.first_value_nick, None)
- if handler:
- handler(bus, message)
-
- return True
-
-
- def _finalize(self, event_id, error = None):
- self.lock.acquire()
-
- try:
- if self.running:
- self._Gst09Operation__running = False
-
- def wrapper():
- super(Gst09Operation, self)._finalize(event_id, error)
- return False
-
- gobject.idle_add(wrapper)
- finally:
- self.lock.release()
-
-
-
- def _on_eos(self, bus, message):
- super(Gst09Operation, self)._on_eos()
-
-
- def _on_error(self, bus, message):
- super(Gst09Operation, self)._on_error(message.parse_error())
-
-
- def _on_tag(self, bus, message):
- super(Gst09Operation, self)._on_tag(message.parse_tag())
-
-
- if gst.gst_version[0] == 0 and gst.gst_version[1] >= 9:
- NEW_PAD_SIGNAL = 'pad-added'
- GstOperation = Gst09Operation
- else:
- NEW_PAD_SIGNAL = 'new-pad'
- GstOperation = Gst08Operation
-
- def create_source(source, location, src_prop = 'location'):
- src = safe_element_factory_make(source)
- src.set_property(src_prop, location)
- return src
-
-
- class AudioMetadataListener(operations.OperationListener):
- '''
- The on_metadata event is called before the FinishedEvent, if the metadata
- retriavel is successful.
- '''
-
- def on_metadata(self, event, metadata):
- pass
-
-
-
- class AudioMetadataEvent(operations.Event):
- '''Event that holds the audio metadata.'''
-
- def __init__(self, source, id, metadata):
- operations.Event.__init__(source, id)
- self._AudioMetadataEvent__metadata = metadata
-
- metadata = property((lambda self: self._AudioMetadataEvent__metadata))
-
-
- class AudioMetadata(operations.Operation, GstOperationListener):
- '''Returns the metadata associated with the source element.
-
- To retrieve the metadata associated with a certain media file on gst-launch -t:
- source ! decodebin ! fakesink
- '''
- can_start = property((lambda self: self._AudioMetadata__oper.can_start))
- running = property((lambda self: self._AudioMetadata__oper.running))
-
- def __init__(self, source):
- super(AudioMetadata, self).__init__()
- bin = gst.parse_launch('decodebin name=am_decodebin ! fakesink name=am_fakesink')
- self._AudioMetadata__oper = GstOperation(pipeline = bin)
- bin.add(source)
- source.link(bin.get_by_name('am_decodebin'))
- self._fakesink = bin.get_by_name('am_fakesink')
- self._fakesink.set_property('signal-handoffs', True)
- self._fakesink.connect('handoff', self.on_handoff)
- self._AudioMetadata__oper.query_element = self._fakesink
- self._AudioMetadata__oper.listeners.append(self)
- self._AudioMetadata__metadata = { }
- self._AudioMetadata__element = None
-
-
- def start(self):
- self._AudioMetadata__oper.start()
-
-
- def stop(self):
- self._check_duration()
- self._AudioMetadata__oper.stop()
-
-
- def on_eos(self, event):
- self._check_duration()
-
-
- def on_error(self, event, message):
- self._check_duration()
-
-
- def on_handoff(self, *ignored):
- self._fakesink.set_property('signal-handoffs', False)
- self.stop()
-
-
- def on_tag(self, event, taglist):
- self._AudioMetadata__metadata.update(taglist)
-
-
- def on_finished(self, event):
- if event.id == operations.ERROR:
- self._propagate(event)
- return None
-
-
- try:
- duration = int(self._AudioMetadata__metadata['duration']) / gst.SECOND
- except KeyError:
- duration = 0
-
- if duration == -1 or duration == 0:
- self._send_finished_event(operations.ERROR)
- return None
-
- self._AudioMetadata__metadata['duration'] = duration
- evt = operations.Event(self)
- self._notify('on_metadata', evt, self._AudioMetadata__metadata)
- self._AudioMetadata__metadata = None
- self._AudioMetadata__element = None
- self._send_finished_event(operations.SUCCESSFUL)
-
-
- def _check_duration(self):
- if not self._AudioMetadata__metadata.has_key('duration'):
- self._AudioMetadata__metadata['duration'] = self._AudioMetadata__oper.query_duration(gst.FORMAT_TIME)
-
-
-
-
- def get_metadata(source, location):
- return AudioMetadata(create_source(source, location))
-
- WavPcmStruct = {
- 'rate': 44100,
- 'signed': True,
- 'channels': 2,
- 'width': 16,
- 'depth': 16,
- 'endianness': 1234 }
- _WAV_PCM_PARSE = 'audio/x-raw-int, endianness=(int)1234, width=(int)16, depth=(int)16, signed=(boolean)true, rate=(int)44100, channels=(int)2'
-
- def is_caps_wav_pcm(caps):
- struct = caps[0]
- if not struct.get_name() == 'audio/x-raw-int':
- return False
-
- for key, value in WavPcmStruct.iteritems():
- if not struct.has_field(key) or struct[key] != value:
- return False
- continue
-
- return True
-
-
- class IsWavPcm(operations.Operation, GstOperationListener):
- '''
- Tests if a certain WAV is in the PCM format.
- '''
- can_start = property((lambda self: self.oper.can_start))
- running = property((lambda self: self.oper.running))
-
- def __init__(self, source):
- super(IsWavPcm, self).__init__()
- self.is_wav_pcm = False
- bin = gst.parse_launch('typefind name=iwp_typefind ! wavparse name=iwp_wavparse ! ' + _WAV_PCM_PARSE + ' ! fakesink name=iwp_fakesink')
- self.oper = GstOperation(pipeline = bin)
- self.oper.listeners.append(self)
- decoder = bin.get_by_name('iwp_typefind')
- sink = bin.get_by_name('iwp_fakesink')
- self.oper.query_element = sink
- sink.set_property('signal-handoffs', True)
- sink.connect('handoff', self.on_handoff)
- self.waveparse = bin.get_by_name('iwp_wavparse')
- self.waveparse.connect(NEW_PAD_SIGNAL, self.on_new_pad)
- self.oper.bin.add(source)
- source.link(decoder)
- self.is_wav_pcm = False
-
-
- def on_handoff(self, *args):
- self.oper.stop()
-
-
- def on_new_pad(self, src, pad):
- caps = pad.get_caps()
- self.is_wav_pcm = is_caps_wav_pcm(caps)
-
-
- def on_finished(self, event):
- if event.id != operations.ERROR and self.is_wav_pcm:
- if not event.id == operations.SUCCESSFUL and event.id == operations.ABORTED:
- raise AssertionError
- self._send_finished_event(operations.SUCCESSFUL)
- elif event.id == operations.SUCCESSFUL:
- eid = operations.ERROR
- err = StandardError('Not a valid WAV PCM')
- else:
- eid = event.id
- err = event.error
- self._send_finished_event(eid, err)
-
-
- def start(self):
- self.oper.start()
- self._IsWavPcm__can_start = False
-
-
- def stop(self):
- self.oper.stop()
-
- can_start = property((lambda self: self._IsWavPcm__can_start))
- running = property((lambda self: self._IsWavPcm__oper != None))
-
-
- def is_wav_pcm(source, location):
- return IsWavPcm(create_source(source, location))
-
- is_wav_pcm = operations.operation_factory(is_wav_pcm)
- is_wav_pcm = operations.async(is_wav_pcm)
-
- def source_to_wav(source, sink):
- '''
- Converts a given source element to wav format and sends it to sink element.
-
- To convert a media file to a wav using gst-launch:
- source ! decodebin ! audioconvert ! audioscale !$_WAV_PCM_PARSE ! wavenc
- '''
- bin = gst.parse_launch('decodebin name=stw_decodebin !audioconvert ! ' + _WAV_PCM_PARSE + ' ! wavenc name=stw_wavenc')
- oper = GstOperation(sink, bin)
- decoder = bin.get_by_name('stw_decodebin')
- encoder = bin.get_by_name('stw_wavenc')
- oper.bin.add(source)
- oper.bin.add(sink)
- source.link(decoder)
- encoder.link(sink)
- return oper
-
- source_to_wav = operations.operation_factory(source_to_wav)
-
- def convert_to_wav(source, source_location, sink_location):
- '''
- Utility function that given a source filename it converts it to a wav
- with sink_filename.
- '''
- sink = safe_element_factory_make('filesink')
- sink.set_property('location', sink_location)
- return source_to_wav(create_source(source, source_location), sink)
-
- convert_to_wav = operations.operation_factory(convert_to_wav)
- convert_to_wav = operations.async(convert_to_wav)
- commands = {
- 'convert': convert_to_wav,
- 'is_wav': is_wav_pcm,
- 'get_metadata': get_metadata }
-
- def parse_command(operation, source, source_location, *args):
- return commands[operation](source, source_location, *args)
-
- if __name__ == '__main__':
- import sys
- import gst
- mainloop = gobject.MainLoop()
-
- class Listener(GstOperationListener):
-
- def __init__(self, oper):
- self.oper = oper
-
-
- def on_metadata(self, event, metadata):
- print >>sys.stderr, metadata
-
-
- def on_finished(self, event):
- self.success = operations.SUCCESSFUL == event.id
- mainloop.quit()
-
-
- def on_progress(self):
- print self.oper.progress
- return True
-
-
- f = parse_command(sys.argv[1], FILE_SRC, sys.argv[2], *sys.argv[3:])
- l = Listener(f)
- if isinstance(f, operations.MeasurableOperation):
- gobject.timeout_add(200, l.on_progress)
-
- f.listeners.append(l)
- f.start()
- l.finished = False
- mainloop.run()
- if not l.success:
- import sys
- sys.exit(1)
-
-
-