Changeset 853
- Timestamp:
- 06/02/07 06:58:42 (2 years ago)
- Files:
-
- 1 modified
-
trunk/jahtools/jbatch/jbatch_server.py (modified) (16 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/jahtools/jbatch/jbatch_server.py
r848 r853 26 26 conform = [ ] 27 27 rpn = [ ] 28 root = "" 28 29 29 30 def __init__( self ): 30 31 self.queue = [ ] 31 32 self.rpn = jstack.stack( ) 33 34 # Register all commands and overrides 32 35 self.rpn.register( "start", self.execute, "Specify start points for playback (<video> start)" ) 33 36 self.rpn.register( "current", self.current, "Place a clone of the current video on the stack" ) 34 37 self.rpn.register( "checkpoint", self.checkpoint, "Specify the cut off point for inclusion in current (<video> checkpoint <filter> start)" ) 35 38 self.rpn.register( "sync", self.sync, "Sync the top of stack position with the current video frame position (current <filter> sync start)" ) 39 self.rpn.register( "shutdown", self.shutdown, "Shutdown the server" ) 40 self.rpn.register( "play", self.play, "Set play state on current video" ) 41 self.rpn.register( "pause", self.pause, "Set pause state on current video" ) 42 self.rpn.register( "toggle", self.toggle, "Toggle pause/play state on current video" ) 43 self.rpn.register( "stop", self.stop, "Terminate current video - next queued media is scheduled/paused" ) 44 self.rpn.register( "ffwd", self.ffwd, "Set fast forward state on current video" ) 45 self.rpn.register( "rew", self.rew, "Set rewind state on current video" ) 46 self.rpn.register( "seek", self.seek, "Seek to an absolute or relative frame ([+/-]n seek)" ) 47 self.rpn.register( "apply", self.apply, "Update a named property on a filter graph component (var.property=value apply)" ) 48 self.rpn.register( "drop_all", self.drop_all, "Clear queue and stack" ) 49 self.rpn.register( ".", self.do, "Immediate stop and play (<media> .)" ) 50 self.rpn.register( "ls", self.ls, "List contents of a directory (<directory> ls)" ) 51 self.rpn.register( "rls", self.rls, "Recursively list contents of a directory and subdirectories (<dir> <depth> rls)" ) 52 self.rpn.register( "tree", self.tree, "Recursively list subdirectories of a directory (<dir> <depth> tree)" ) 53 self.rpn.register( "describe", self.describe, "Describe the current filter graph" ) 54 self.rpn.register( "info", self.send_info, "Return current play state information" ) 55 self.rpn.register( "speed", self.set_speed, "Set the playout speed on the current media ([+/-]n speed)" ) 56 self.rpn.register( "help", self.help, "This information" ) 57 self.rpn.register( "queue", self.query_queue, "Query the queue" ) 58 self.rpn.register( "stack", self.query_stack, "Query the stack" ) 59 self.rpn.register( "functions", self.query_functions, "Query function definitions" ) 60 self.rpn.register( "filters", self.query_filters, "Query filter definitions" ) 61 self.rpn.register( "next", self.next, "Play next in queue (if queue not empty)" ) 62 63 # Provide some base functionality 36 64 self.rpn.push( "jstack.oml" ) 65 self.rpn.push_args( rpn_engine.rpn ) 66 37 67 # Two conditions and locks used here 38 68 # mutex/not_empty is used to handle the queue 39 69 self.mutex = threading.Lock( ) 40 70 self.not_empty = threading.Condition( self.mutex ) 71 41 72 # notify/notification is used to handle the status sessions 42 73 self.notify = threading.Lock( ) … … 54 85 self.current_input = [ ] 55 86 self.input = None 56 self.dict = { }57 self.help = { }58 self.register( "shutdown", self.shutdown, "Shutdown the server" )59 self.register( "play", self.play, "Set play state on current video" )60 self.register( "pause", self.pause, "Set pause state on current video" )61 self.register( "toggle", self.toggle, "Toggle pause/play state on current video" )62 self.register( "stop", self.stop, "Terminate current video - next queued command is processed" )63 self.register( "ffwd", self.ffwd, "Set fast forward state on current video" )64 self.register( "rew", self.rew, "Set rewind state on current video" )65 self.register( "seek", self.seek, "Seek to an absolute or relative frame ([+/-]n seek)" )66 self.register( "apply", self.apply, "Update a named property on a filter graph component (var.property=value apply)" )67 self.register( "drop_all", self.drop_all, "Clear queue and stack" )68 self.register( ".", self.do, "Immediate stop and play (<media> .)" )69 self.register( "ls", self.ls, "List contents of a directory (<directory> ls)" )70 self.register( "rls", self.rls, "Recursively list contents of a directory and subdirectories (<dir> <depth> rls)" )71 self.register( "tree", self.tree, "Recursively list subdirectories of a directory (<dir> <depth> tree)" )72 self.register( "describe", self.describe, "Describe the current filter graph" )73 self.register( "info", self.send_info, "Return current play state information" )74 self.register( "speed", self.set_speed, "Set the playout speed on the current media ([+/-]n speed)" )75 self.register( "help", self.helper, "This information" )76 self.register( "help_rpn", self.helper_rpn, "RPN related help" )77 self.register( "queue", self.query_queue, "Query the queue" )78 self.register( "stack", self.query_stack, "Query the stack" )79 self.register( "functions", self.query_functions, "Query function definitions" )80 81 def register( self, name, command, help ):82 self.dict[ name ] = command83 self.help[ name ] = help84 87 85 88 def run( self ): 86 self.rpn.push_args( rpn_engine.rpn )87 89 self.not_empty.acquire( ) 88 90 while self.running: 89 91 while self.running and len( self.queue ) > 0: 90 arg = self.queue.pop( 0 ) 92 self.id += 1 93 pos = self.position 94 input = self.queue.pop( 0 ) 91 95 self.not_empty.release( ) 92 try: 93 self.rpn.push( arg ) 94 except Exception, e: 95 print str( e ) 96 if not self.checked: 97 self.del_current( ) 98 self.set_current( input ) 99 self.checked = False 100 self.rpn.push_input( input ) 101 self.rpn.push_args( rpn_engine.conform ) 102 modified = self.rpn.pop( ) 103 if self.sync_request and pos < modified.get_frames( ): 104 modified.seek( pos, False ) 105 self.sync_request = False 106 self.preview( modified ) 107 if not self.playing or ( modified.get_position( ) >= modified.get_frames( ) - 1 and not self.rpn.empty( ) ): 108 if modified.get_position( ) >= modified.get_frames( ) - 1: 109 input.seek( 0, False ) 110 self.rpn.deposit( modified ) 111 else: 112 self.rpn.push_input( input ) 113 self.playing = False 96 114 self.not_empty.acquire( ) 97 115 if self.running and len( self.queue ) == 0: 98 116 self.not_empty.wait( ) 99 117 self.not_empty.release( ) 100 101 def push( self, arg, client = None ):102 self.not_empty.acquire( )103 if arg in self.dict.keys( ):104 try:105 self.dict[ arg ]( socket = client )106 except Exception, e:107 print str( e )108 else:109 self.queue.append( arg )110 self.not_empty.release( )111 return self.running112 118 113 119 def inform( self ): … … 142 148 143 149 def set_speed( self, **args ): 144 tos = self. queue.pop( )150 tos = self.rpn.pop( ).get_uri( ) 145 151 self.play( int( tos ) ) 146 152 147 153 def seek( self, **args ): 148 tos = self. queue.pop( )154 tos = self.rpn.pop( ).get_uri( ) 149 155 if tos[ 0 ] in "+-": 150 156 tos = self.position + int( tos ) … … 158 164 def do( self, **args ): 159 165 self.stop( ) 160 self. queue.append( "start")166 self.rpn.push( "start", **args ) 161 167 self.play( ) 168 169 def next( self, **args ): 170 if len( self.queue ) > 0: 171 self.stop( ) 172 self.play( ) 162 173 163 174 def dump( self, input, **args ): … … 170 181 171 182 uri = balance.retrieve_uri( input ) 183 184 if uri.startswith( rpn_engine.root ): 185 uri = uri[ len( rpn_engine.root ) : ] 186 172 187 if " " in uri and not uri.startswith( "\"" ): 173 188 result += "\"" + uri +"\"" … … 192 207 def describe( self, **args ): 193 208 args[ "socket" ].sendall( self.dump( self.input ) ) 194 return ""195 209 196 210 def sync( self, **args ): … … 198 212 199 213 def apply( self, **args ): 200 tos = self. queue.pop( )214 tos = self.rpn.pop( ).get_uri( ) 201 215 pair = tos.split( ".", 1 ) 202 216 if pair[ 0 ] in self.rpn.locals[ -1 ].keys( ): 203 217 filter = self.rpn.locals[ -1 ][ pair[ 0 ] ] 204 218 pair = pair[ 1 ].split( "=", 1 ) 205 properties.update_prop( filter, pair[ 0 ], pair[ 1 ])219 properties.update_prop( filter, str( pair[ 0 ] ), str( pair[ 1 ] ) ) 206 220 207 221 def drop_all( self, **args ): … … 236 250 self.set_current( input ) 237 251 238 def execute( self ):239 if not self. playing and not self.rpn.empty( ):252 def execute( self, **args ): 253 if not self.rpn.empty( ): 240 254 try: 241 self.id += 1242 pos = self.position243 255 input = self.rpn.pop( ) 244 256 if input is not None: 245 if not self.checked: 246 self.del_current( ) 247 self.set_current( input ) 248 self.checked = False 249 self.rpn.push_input( input ) 250 self.rpn.push_args( rpn_engine.conform ) 251 modified = self.rpn.pop( ) 252 if self.sync_request and pos < modified.get_frames( ): 253 modified.seek( pos, False ) 254 self.sync_request = False 255 self.preview( modified ) 256 if not self.playing or ( modified.get_position( ) >= modified.get_frames( ) - 1 and not self.rpn.empty( ) ): 257 if modified.get_position( ) >= modified.get_frames( ) - 1: 258 input.seek( 0, False ) 259 self.rpn.deposit( modified ) 260 else: 261 self.rpn.push_input( input ) 262 self.playing = False 257 self.not_empty.acquire( ) 258 self.queue.append( input ) 259 self.not_empty.release( ) 260 263 261 except Exception, e: 264 262 print str( e ) … … 338 336 339 337 def ls( self, **args ): 340 dir = self. queue.pop( )338 dir = self.rpn.pop( ).get_uri( ) 341 339 result = "" 342 340 if os.access( dir, os.R_OK ): 343 341 for entry in os.listdir( dir ): 344 full = os.path.join( dir, entry ) 345 if os.path.isdir( full ): 346 result += "\"%s/\"\n" % entry 347 else: 348 result += "\"%s\"\n" % entry 342 try: 343 full = os.path.join( dir, str( entry ) ) 344 if os.path.isdir( full ): 345 result += "\"%s/\"\n" % str( entry ) 346 else: 347 result += "\"%s\"\n" % str( entry ) 348 except Exception: 349 # TODO: utf8 handling 350 print "rejecting", entry 349 351 args[ "socket" ].sendall( result ) 350 352 351 353 def rls( self, files = True, **args ): 352 354 socket = args[ "socket" ] 353 depth = int( self. queue.pop( ) )354 dir = self. queue.pop( )355 socket.sendall( "\"%s\"\n" % dir )355 depth = int( self.rpn.pop( ).get_uri( ) ) 356 dir = self.rpn.pop( ).get_uri( ) 357 socket.sendall( "\"%s\"\n" % dir[ len( rpn_engine.root ) : ] ) 356 358 if os.access( dir, os.R_OK ): 357 359 contents = os.listdir( dir ) … … 368 370 full = os.path.join( dir, entry ) 369 371 if os.path.isdir( full ): 370 self. queue.append( full )371 self. queue.append( depth )372 self.rpn.push( full ) 373 self.rpn.push( depth ) 372 374 self.rls( files, **args ) 373 375 else: … … 380 382 self.rls( False, args ) 381 383 382 def helper( self, **args ): 383 words = self.dict.keys( ) 384 words.sort( ) 385 for n in words: 386 args[ "socket" ].sendall( "%-10.10s - %s\n" % ( n, self.help[ n ] ) ) 387 388 def helper_rpn( self, **args ): 384 def help( self, **args ): 389 385 words = self.rpn.dict.keys( ) 390 386 words.sort( ) 391 387 for n in words: 392 if n in self.rpn.help.keys( ) and n not in self.dict.keys( ):388 if n in self.rpn.help.keys( ): 393 389 args[ "socket" ].sendall( "%-10.10s - %s\n" % ( n, self.rpn.help[ n ] ) ) 394 390 … … 413 409 count += 1 414 410 socket.sendall( "\n;\n\n" ) 411 412 def query_filters( self, **args ): 413 oml_plugins = openpluginlib.discovery( openpluginlib.all_query_traits( "openmedialib", "", "", 0 ) ) 414 for y in oml_plugins: 415 if y.type( ) == "filter": 416 for f in y.extension( ): 417 args[ "socket" ].sendall( f + "\n" ) 415 418 416 419 class rpn_status: … … 469 472 current = current[ 1:-1 ] 470 473 if current != "": 471 tokens.append( current ) 472 current = "" 474 current = urllib.unquote( current ) 475 full = rpn_engine.root + current 476 # Attempts to verify dir/file access 477 if current == ".": 478 tokens.append( current ) 479 elif os.path.isdir( full ): 480 tokens.append( "op:" + full ) 481 elif os.path.exists( full ): 482 tokens.append( full ) 483 elif not os.path.exists( current ): 484 tokens.append( current ) 485 else: 486 # FIXME: security issue with final else 487 tokens.append( current ) 488 current = "" 473 489 if current != "": 474 490 result = "Error - mismatched quotes - ignoring command" … … 488 504 if arg == "bye": 489 505 finished = True 490 elif arg.find( "sdl." ) == 0:491 self.server.engine.rpn.push( arg )492 506 else: 493 finished = not self.server.engine.push( urllib.unquote( arg ),self.client )507 self.server.engine.rpn.push( arg, socket = self.client ) 494 508 if finished: break 495 509 if result != "": … … 535 549 args = line.split( " " ) 536 550 for arg in args: 537 finished = not self.server.engine. push( urllib.unquote( arg ) )551 finished = not self.server.engine.rpn.push( urllib.unquote( arg ) ) 538 552 if finished: break 539 553 if finished: break … … 541 555 self.client.close( ) 542 556 if finished: 543 self.server.engine. push( "shutdown" )557 self.server.engine.rpn.push( "shutdown" ) 544 558 self.server.engine.inform( ) 545 559
