Du bist nicht angemeldet.

#1 13.03.2020 20:40:14

BigLin
Mitglied

Python Mutiprocessing mit mehreren Dataframes

Hallo Zusammen,

ich habe mehrere größere Dataframes mit 3 Mio. - 33 Mio. Einträgen.

Durch das größte Dataframe liegt mein Auslastung des RAMs bei ca. 80%, weshalb ich den Zugriff  auf die Dataframes jedem Kindprozess ermöglichen möchte, statt jeder Prozess die Dataframes eigen einläd.
Dafür habe ich die Lösung nach folgendem StackOverflow-Betiräg umgesetzt:
https://stackoverflow.com/questions/224 … me-between

def startMultiprocessingMultiple(numberOfInstances,function,datasetToSplit,*datasets):

    print("INFO: Multiprocessing with Instances: " + str(numberOfInstances))
    
    part = int(len(datasetToSplit)/numberOfInstances)
    usedData = []
    instances = []
    
    n = 0
    mgr = Manager()
    ns = mgr.Namespace()
    

    while n < len(datasets):
        if n == 0:
            ns.a = datasets[n]
        elif n == 1:
            ns.b = datasets[n]
        elif n  == 2:
            ns.c = datasets[n]
        elif n == 3:
            ns.d = datasets[n]
        n = n + 1


    i = 0
    while i < numberOfInstances:
        usedData = []
        
        if i == 0:
            #print(":"+str(part))
            usedData = datasetToSplit[:part]
        elif i == (numberOfInstances - 1):
            #print(str(part*i)+":")
            usedData = datasetToSplit[(part*i):]
        else:
            #print(str(part*i) +":"+ str(part*(i+1)))
            usedData = datasetToSplit[(part*i):(part*(i+1))]
        
        instances.append(mp.Process(target=function, 
                                    args=(i, usedData,ns)))
        instances[i].start()
        print("INFO: Start Instance "+ str(i) + " with PID: " + str(instances[i].pid))
        
        i = i + 1
        
    for instance in instances:
        instance.join()
        print("INFO: Instance " + str(instance.pid) +" finished")
        
    print("INFO: All Instances finished")

Ausschnitt aus meiner angezielten Funktion:

def mergeBuslinesAndGPS(idInstance,lines,nameSpace):
    busGPS = nameSpace.a
    buslinesGPS = nameSpace.b
    smartCardMetroBus = nameSpace.c

Leider erhalte ich beim jedem ausführen einen - meist - anderen Fehler: OSError, EOFError, BrokenPipeError, Connection reset by peer ...

NFO: Start Enviroment for mergeBuslinesAndGPS
INFO: Multiprocessing with Instances: 2
INFO: Start Instance 0 with PID: 3951
INFO: Start Instance 1 with PID: 3952
Process Process-3:
Traceback (most recent call last):
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/.../Source/Functions/DataPreprocessing/GPS.py", line 21, in mergeBuslinesAndGPS
    busGPS = nameSpace.a
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
    kind, result = conn.recv()
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 411, in _recv_bytes
    return self._recv(size)
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 385, in _recv
    raise OSError("got end of file during message")
OSError: got end of file during message
Process Process-2:
Traceback (most recent call last):
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/.../Source/Functions/DataPreprocessing/GPS.py", line 22, in mergeBuslinesAndGPS
    buslinesGPS = nameSpace.b
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/local/anaconda/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
INFO: Instance 3951 finished
INFO: Instance 3952 finished
INFO: All Instances finished

Durch die unterschiedlichen Fehler habe ich leider inzwischen keinen Ansatz mehr, was das Problem sein könnte.
Alle Fehler scheinen ein Problem mit dem Zugriff auf die Dataframes anzudeuten.

Während der Ausführung habe ich keine Operationen, welche die Dataframes verändern.
Es exisiteren "nur" Such-Operationen:

splitDataset = smartCardMetroBus[smartCardMetroBus["Line"] == line]
...
busline = buslinesGPS[buslinesGPS["Line"] == str(line)]

Gibt es Ideen? ... Vorschläge? ... habe ich was "grundsätzliches" falsch gemacht?
Falls ich nur ein Prozess starte, funktioniert alles problemlos roll

Danke für ein Feedback! smile

Liebe Grüße,
BigLin


P.S. mein letzter Login war am 04.09.2010... schon ein bisschen her... schön wieder hier zu sein tongue

Offline

#2 13.03.2020 21:19:53

BigLin
Mitglied

Re: Python Mutiprocessing mit mehreren Dataframes

Bin über folgendem Beitrag gestoßen:
https://stackoverflow.com/questions/573 … -eof-error

Geprüft ob, wie im Beitrag beschrieben, der Python-Prozess gekillt wird.

dmesg | egrep -i "killed process"
...
[ 5251.893141] Out of memory: Killed process 5990 (python) total-vm:11603572kB, anon-rss:6828224kB, file-rss:0kB, shmem-rss:0kB

Ich glaube... ich habe meine Antwort hmm

Beitrag geändert von BigLin (13.03.2020 21:20:42)

Offline

Schnellantwort auf dieses Thema

Schreibe deinen Beitrag und versende ihn
Deine Antwort

Fußzeile des Forums